bufmgr.c 56.7 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * bufmgr.c
4
 *	  buffer manager interface routines
5
 *
P
 
PostgreSQL Daemon 已提交
6
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
B
Bruce Momjian 已提交
11
 *	  $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.195 2005/08/12 23:13:54 momjian Exp $
12 13 14 15
 *
 *-------------------------------------------------------------------------
 */
/*
16 17 18
 * ReadBuffer() -- find or create a buffer holding the requested page,
 *		and pin it so that no one can destroy it while this process
 *		is using it.
19 20 21 22
 *
 * ReleaseBuffer() -- unpin the buffer
 *
 * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty"
23
 *		but don't unpin.  The disk IO is delayed until buffer
24
 *		replacement.
25
 *
26
 * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
27
 *
28 29 30
 * BufferSync() -- flush all dirty buffers in the buffer pool.
 *
 * BgBufferSync() -- flush some dirty buffers in the buffer pool.
31
 *
32 33
 * InitBufferPool() -- Init the buffer module.
 *
34 35 36
 * See other files:
 *		freelist.c -- chooses victim for buffer replacement
 *		buf_table.c -- manages the buffer lookup table
37
 */
38
#include "postgres.h"
V
WAL  
Vadim B. Mikheev 已提交
39

40
#include <sys/file.h>
J
Jan Wieck 已提交
41
#include <unistd.h>
42

43
#include "lib/stringinfo.h"
B
Bruce Momjian 已提交
44
#include "miscadmin.h"
45 46
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
47
#include "storage/bufpage.h"
48
#include "storage/ipc.h"
49
#include "storage/proc.h"
50
#include "storage/smgr.h"
B
Bruce Momjian 已提交
51
#include "utils/relcache.h"
52
#include "utils/resowner.h"
53 54
#include "pgstat.h"

55

56
/* Note: these two macros only work on shared buffers, not local ones! */
57
#define BufHdrGetBlock(bufHdr)	((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
58 59 60 61 62
#define BufferGetLSN(bufHdr)	(*((XLogRecPtr*) BufHdrGetBlock(bufHdr)))

/* Note: this macro only works on local buffers, not shared ones! */
#define LocalBufHdrGetBlock(bufHdr)	\
	LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
V
Vadim B. Mikheev 已提交
63

64

65
/* GUC variables */
B
Bruce Momjian 已提交
66
bool		zero_damaged_pages = false;
67 68 69 70
double		bgwriter_lru_percent = 1.0;
double		bgwriter_all_percent = 0.333;
int			bgwriter_lru_maxpages = 5;
int			bgwriter_all_maxpages = 5;
71

72 73 74 75

long		NDirectFileRead;	/* some I/O's are direct file access.
								 * bypass bufmgr */
long		NDirectFileWrite;	/* e.g., I/O in psort and hashjoin. */
H
 
Hiroshi Inoue 已提交
76

77

78 79 80 81 82 83 84
/* local state for StartBufferIO and related functions */
static BufferDesc *InProgressBuf = NULL;
static bool IsForInput;
/* local state for LockBufferForCleanup */
static BufferDesc *PinCountWaitBuf = NULL;


85 86 87 88
static bool PinBuffer(BufferDesc *buf);
static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK);
static bool SyncOneBuffer(int buf_id, bool skip_pinned);
89
static void WaitIO(BufferDesc *buf);
90 91 92
static bool StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
							  int set_flag_bits);
93
static void buffer_write_error_callback(void *arg);
94
static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
95
			bool *foundPtr);
96
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
97
static void AtProcExit_Buffers(int code, Datum arg);
B
Bruce Momjian 已提交
98
static void write_buffer(Buffer buffer, bool unpin);
99

100

101 102
/*
 * ReadBuffer -- returns a buffer containing the requested
103 104
 *		block of the requested relation.  If the blknum
 *		requested is P_NEW, extend the relation file and
105 106 107
 *		allocate a new block.  (Caller is responsible for
 *		ensuring that only one backend tries to extend a
 *		relation at the same time!)
108 109
 *
 * Returns: the buffer number for the buffer containing
110 111
 *		the block read.  The returned buffer has been pinned.
 *		Does not return on error --- elog's instead.
112 113
 *
 * Assume when this function is called, that reln has been
114
 *		opened already.
115 116 117 118
 */
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
119
	BufferDesc *bufHdr;
120
	Block		bufBlock;
121
	bool		found;
122
	bool		isExtend;
123
	bool		isLocalBuf;
124

125 126 127
	/* Make sure we will have room to remember the buffer pin */
	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

128
	isExtend = (blockNum == P_NEW);
129
	isLocalBuf = reln->rd_istemp;
130

131
	/* Open it at the smgr level if not already done */
132
	RelationOpenSmgr(reln);
133

134 135 136 137
	/* Substitute proper block number if caller asked for P_NEW */
	if (isExtend)
		blockNum = smgrnblocks(reln->rd_smgr);

138 139
	pgstat_count_buffer_read(&reln->pgstat_info, reln);

140 141 142 143 144 145 146 147 148 149
	if (isLocalBuf)
	{
		ReadLocalBufferCount++;
		bufHdr = LocalBufferAlloc(reln, blockNum, &found);
		if (found)
			LocalBufferHitCount++;
	}
	else
	{
		ReadBufferCount++;
B
Bruce Momjian 已提交
150

151 152 153 154
		/*
		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested
		 * block is not currently in memory.
		 */
155
		bufHdr = BufferAlloc(reln, blockNum, &found);
156 157 158 159
		if (found)
			BufferHitCount++;
	}

160
	/* At this point we do NOT hold any locks. */
161

162
	/* if it was already in the buffer pool, we're done */
163 164
	if (found)
	{
165 166
		/* Just need to update stats before we exit */
		pgstat_count_buffer_hit(&reln->pgstat_info, reln);
167

168 169 170 171
		if (VacuumCostActive)
			VacuumCostBalance += VacuumCostPageHit;

		return BufferDescriptorGetBuffer(bufHdr);
172 173
	}

174
	/*
175 176 177 178 179
	 * if we have gotten to this point, we have allocated a buffer for the
	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for
	 * it, if it's a shared buffer.
	 *
	 * Note: if smgrextend fails, we will end up with a buffer that is
B
Bruce Momjian 已提交
180 181 182 183 184
	 * allocated but not marked BM_VALID.  P_NEW will still select the
	 * same block number (because the relation didn't get any longer on
	 * disk) and so future attempts to extend the relation will find the
	 * same buffer (if it's not been recycled) but come right back here to
	 * try smgrextend again.
185
	 */
186 187 188
	Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */

	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
189

190
	if (isExtend)
191 192
	{
		/* new buffers are zero-filled */
193 194
		MemSet((char *) bufBlock, 0, BLCKSZ);
		smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock,
195
				   reln->rd_istemp);
196
	}
197 198
	else
	{
199
		smgrread(reln->rd_smgr, blockNum, (char *) bufBlock);
200
		/* check for garbage data */
201
		if (!PageHeaderIsValid((PageHeader) bufBlock))
202
		{
203
			/*
B
Bruce Momjian 已提交
204 205 206 207 208
			 * During WAL recovery, the first access to any data page
			 * should overwrite the whole page from the WAL; so a
			 * clobbered page header is not reason to fail.  Hence, when
			 * InRecovery we may always act as though zero_damaged_pages
			 * is ON.
209 210
			 */
			if (zero_damaged_pages || InRecovery)
211
			{
212 213
				ereport(WARNING,
						(errcode(ERRCODE_DATA_CORRUPTED),
214
						 errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
B
Bruce Momjian 已提交
215
							  blockNum, RelationGetRelationName(reln))));
216
				MemSet((char *) bufBlock, 0, BLCKSZ);
217 218
			}
			else
219 220
				ereport(ERROR,
						(errcode(ERRCODE_DATA_CORRUPTED),
B
Bruce Momjian 已提交
221 222
						 errmsg("invalid page header in block %u of relation \"%s\"",
							  blockNum, RelationGetRelationName(reln))));
223
		}
224 225 226
	}

	if (isLocalBuf)
227
	{
228 229
		/* Only need to adjust flags */
		bufHdr->flags |= BM_VALID;
230
	}
231 232
	else
	{
233 234
		/* Set BM_VALID, terminate IO, and wake up any waiters */
		TerminateBufferIO(bufHdr, false, BM_VALID);
235
	}
236

237 238
	if (VacuumCostActive)
		VacuumCostBalance += VacuumCostPageMiss;
239

240
	return BufferDescriptorGetBuffer(bufHdr);
241 242 243
}

/*
244 245 246
 * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
 *		buffer.  If no buffer exists already, selects a replacement
 *		victim and evicts the old page, but does NOT read in new page.
247
 *
248 249 250 251 252 253 254
 * The returned buffer is pinned and is already marked as holding the
 * desired page.  If it already did have the desired page, *foundPtr is
 * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
 * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
 *
 * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
 * we keep it for simplicity in ReadBuffer.
255
 *
256
 * No locks are held either at entry or exit.
257 258 259
 */
static BufferDesc *
BufferAlloc(Relation reln,
260
			BlockNumber blockNum,
261
			bool *foundPtr)
262
{
263
	BufferTag	newTag;			/* identity of requested block */
264 265 266 267 268
	BufferTag	oldTag;
	BufFlags	oldFlags;
	int			buf_id;
	BufferDesc *buf;
	bool		valid;
269

270
	/* create a tag so we can lookup the buffer */
271
	INIT_BUFFERTAG(newTag, reln, blockNum);
272 273

	/* see if the block is in the buffer pool already */
274 275 276
	LWLockAcquire(BufMappingLock, LW_SHARED);
	buf_id = BufTableLookup(&newTag);
	if (buf_id >= 0)
277 278
	{
		/*
279
		 * Found it.  Now, pin the buffer so no one can steal it from the
280 281
		 * buffer pool, and check to see if the correct data has been
		 * loaded into the buffer.
282
		 */
283 284 285
		buf = &BufferDescriptors[buf_id];

		valid = PinBuffer(buf);
286

287 288 289 290
		/* Can release the mapping lock as soon as we've pinned it */
		LWLockRelease(BufMappingLock);

		*foundPtr = TRUE;
291

292
		if (!valid)
H
 
Hiroshi Inoue 已提交
293
		{
294 295 296 297 298 299 300 301
			/*
			 * We can only get here if (a) someone else is still reading
			 * in the page, or (b) a previous read attempt failed.  We
			 * have to wait for any active read attempt to finish, and
			 * then set up our own read attempt if the page is still not
			 * BM_VALID.  StartBufferIO does it all.
			 */
			if (StartBufferIO(buf, true))
302 303 304 305 306 307 308
			{
				/*
				 * If we get here, previous attempts to read the buffer
				 * must have failed ... but we shall bravely try again.
				 */
				*foundPtr = FALSE;
			}
309 310
		}

311
		return buf;
312 313
	}

314
	/*
315
	 * Didn't find it in the buffer pool.  We'll have to initialize a new
316
	 * buffer.  Remember to unlock BufMappingLock while doing the work.
317
	 */
318
	LWLockRelease(BufMappingLock);
319

320 321 322
	/* Loop here in case we have to try another victim buffer */
	for (;;)
	{
323
		/*
324 325 326 327
		 * Select a victim buffer.  The buffer is returned with its
		 * header spinlock still held!  Also the BufFreelistLock is
		 * still held, since it would be bad to hold the spinlock
		 * while possibly waking up other processes.
328
		 */
329 330
		buf = StrategyGetBuffer();

331 332
		Assert(buf->refcount == 0);

333 334
		/* Must copy buffer flags while we still hold the spinlock */
		oldFlags = buf->flags;
335

336 337
		/* Pin the buffer and then release the buffer spinlock */
		PinBuffer_Locked(buf);
B
Bruce Momjian 已提交
338

339 340
		/* Now it's safe to release the freelist lock */
		LWLockRelease(BufFreelistLock);
341

342 343 344 345 346 347 348 349 350
		/*
		 * If the buffer was dirty, try to write it out.  There is a race
		 * condition here, in that someone might dirty it after we released
		 * it above, or even while we are writing it out (since our share-lock
		 * won't prevent hint-bit updates).  We will recheck the dirty bit
		 * after re-locking the buffer header.
		 */
		if (oldFlags & BM_DIRTY)
		{
351
			/*
352 353 354 355 356 357 358 359 360 361 362 363 364
			 * We need a share-lock on the buffer contents to write it out
			 * (else we might write invalid data, eg because someone else
			 * is compacting the page contents while we write).  We must use
			 * a conditional lock acquisition here to avoid deadlock.  Even
			 * though the buffer was not pinned (and therefore surely not
			 * locked) when StrategyGetBuffer returned it, someone else could
			 * have pinned and exclusive-locked it by the time we get here.
			 * If we try to get the lock unconditionally, we'd block waiting
			 * for them; if they later block waiting for us, deadlock ensues.
			 * (This has been observed to happen when two backends are both
			 * trying to split btree index pages, and the second one just
			 * happens to be trying to split the page the first one got from
			 * StrategyGetBuffer.)
365
			 */
366 367 368 369 370 371
			if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
			{
				FlushBuffer(buf, NULL);
				LWLockRelease(buf->content_lock);
			}
			else
372 373
			{
				/*
374 375
				 * Someone else has pinned the buffer, so give it up and
				 * loop back to get another one.
376
				 */
377 378 379 380
				UnpinBuffer(buf, true, false /* evidently recently used */ );
				continue;
			}
		}
381

382 383 384 385 386
		/*
		 * Acquire exclusive mapping lock in preparation for changing
		 * the buffer's association.
		 */
		LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);
387

388 389 390 391 392 393 394 395
		/*
		 * Try to make a hashtable entry for the buffer under its new tag.
		 * This could fail because while we were writing someone else
		 * allocated another buffer for the same block we want to read in.
		 * Note that we have not yet removed the hashtable entry for the
		 * old tag.
		 */
		buf_id = BufTableInsert(&newTag, buf->buf_id);
396

397 398 399 400 401 402 403 404 405 406 407
		if (buf_id >= 0)
		{
			/*
			 * Got a collision. Someone has already done what we were about
			 * to do. We'll just handle this as if it were found in
			 * the buffer pool in the first place.	First, give up the
			 * buffer we were planning to use.  Don't allow it to be
			 * thrown in the free list (we don't want to hold both
			 * global locks at once).
			 */
			UnpinBuffer(buf, true, false);
408

409
			/* remaining code should match code at top of routine */
410

411
			buf = &BufferDescriptors[buf_id];
412

413
			valid = PinBuffer(buf);
414

415 416
			/* Can release the mapping lock as soon as we've pinned it */
			LWLockRelease(BufMappingLock);
417

418 419 420
			*foundPtr = TRUE;

			if (!valid)
421
			{
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
				/*
				 * We can only get here if (a) someone else is still reading
				 * in the page, or (b) a previous read attempt failed.  We
				 * have to wait for any active read attempt to finish, and
				 * then set up our own read attempt if the page is still not
				 * BM_VALID.  StartBufferIO does it all.
				 */
				if (StartBufferIO(buf, true))
				{
					/*
					 * If we get here, previous attempts to read the buffer
					 * must have failed ... but we shall bravely try again.
					 */
					*foundPtr = FALSE;
				}
437
			}
438 439

			return buf;
M
Marc G. Fournier 已提交
440
		}
441

442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
		/*
		 * Need to lock the buffer header too in order to change its tag.
		 */
		LockBufHdr_NoHoldoff(buf);

		/*
		 * Somebody could have pinned or re-dirtied the buffer while we were
		 * doing the I/O and making the new hashtable entry.  If so, we
		 * can't recycle this buffer; we must undo everything we've done and
		 * start over with a new victim buffer.
		 */
		if (buf->refcount == 1 && !(buf->flags & BM_DIRTY))
			break;

		UnlockBufHdr_NoHoldoff(buf);
		BufTableDelete(&newTag);
		LWLockRelease(BufMappingLock);
		UnpinBuffer(buf, true, false /* evidently recently used */ );
	}
461 462

	/*
463 464 465 466 467
	 * Okay, it's finally safe to rename the buffer.
	 *
	 * Clearing BM_VALID here is necessary, clearing the dirtybits
	 * is just paranoia.  We also clear the usage_count since any
	 * recency of use of the old content is no longer relevant.
468
	 */
469 470
	oldTag = buf->tag;
	oldFlags = buf->flags;
471
	buf->tag = newTag;
472
	buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
473 474 475 476 477 478 479 480 481
	buf->flags |= BM_TAG_VALID;
	buf->usage_count = 0;

	UnlockBufHdr_NoHoldoff(buf);

	if (oldFlags & BM_TAG_VALID)
		BufTableDelete(&oldTag);

	LWLockRelease(BufMappingLock);
482

483
	/*
484 485 486 487
	 * Buffer contents are currently invalid.  Try to get the io_in_progress
	 * lock.  If StartBufferIO returns false, then someone else managed
	 * to read it before we did, so there's nothing left for BufferAlloc()
	 * to do.
488
	 */
489 490
	if (StartBufferIO(buf, true))
		*foundPtr = FALSE;
H
 
Hiroshi Inoue 已提交
491
	else
492
		*foundPtr = TRUE;
493

494
	return buf;
495 496
}

497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
/*
 * InvalidateBuffer -- mark a shared buffer invalid and return it to the
 * freelist.
 *
 * The buffer header spinlock must be held at entry.  We drop it before
 * returning.  (This is sane because the caller must have locked the
 * buffer in order to be sure it should be dropped.)
 *
 * This is used only in contexts such as dropping a relation.  We assume
 * that no other backend could possibly be interested in using the page,
 * so the only reason the buffer might be pinned is if someone else is
 * trying to write it out.  We have to let them finish before we can
 * reclaim the buffer.
 *
 * The buffer could get reclaimed by someone else while we are waiting
 * to acquire the necessary locks; if so, don't mess it up.
 */
static void
InvalidateBuffer(BufferDesc *buf)
{
	BufferTag	oldTag;
	BufFlags	oldFlags;

	/* Save the original buffer tag before dropping the spinlock */
	oldTag = buf->tag;

	UnlockBufHdr(buf);

retry:
	/*
	 * Acquire exclusive mapping lock in preparation for changing
	 * the buffer's association.
	 */
	LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);

	/* Re-lock the buffer header (NoHoldoff since we have an LWLock) */
	LockBufHdr_NoHoldoff(buf);

	/* If it's changed while we were waiting for lock, do nothing */
	if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
	{
		UnlockBufHdr_NoHoldoff(buf);
		LWLockRelease(BufMappingLock);
		return;
	}

	/*
	 * We assume the only reason for it to be pinned is that someone else
	 * is flushing the page out.  Wait for them to finish.  (This could be
	 * an infinite loop if the refcount is messed up... it would be nice
	 * to time out after awhile, but there seems no way to be sure how
	 * many loops may be needed.  Note that if the other guy has pinned
	 * the buffer but not yet done StartBufferIO, WaitIO will fall through
	 * and we'll effectively be busy-looping here.)
	 */
	if (buf->refcount != 0)
	{
		UnlockBufHdr_NoHoldoff(buf);
		LWLockRelease(BufMappingLock);
556 557 558
		/* safety check: should definitely not be our *own* pin */
		if (PrivateRefCount[buf->buf_id] != 0)
			elog(ERROR, "buffer is pinned in InvalidateBuffer");
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
		WaitIO(buf);
		goto retry;
	}

	/*
	 * Clear out the buffer's tag and flags.  We must do this to ensure
	 * that linear scans of the buffer array don't think the buffer is valid.
	 */
	oldFlags = buf->flags;
	CLEAR_BUFFERTAG(buf->tag);
	buf->flags = 0;
	buf->usage_count = 0;

	UnlockBufHdr_NoHoldoff(buf);

	/*
	 * Remove the buffer from the lookup hashtable, if it was in there.
	 */
	if (oldFlags & BM_TAG_VALID)
		BufTableDelete(&oldTag);

	/*
	 * Avoid accepting a cancel interrupt when we release the mapping lock;
	 * that would leave the buffer free but not on the freelist.  (Which would
	 * not be fatal, since it'd get picked up again by the clock scanning
	 * code, but we'd rather be sure it gets to the freelist.)
	 */
	HOLD_INTERRUPTS();

	LWLockRelease(BufMappingLock);

	/*
	 * Insert the buffer at the head of the list of free buffers.
	 */
	StrategyFreeBuffer(buf, true);

	RESUME_INTERRUPTS();
}

598
/*
B
Bruce Momjian 已提交
599
 * write_buffer -- common functionality for
B
Bruce Momjian 已提交
600
 *				   WriteBuffer and WriteNoReleaseBuffer
601
 */
B
Bruce Momjian 已提交
602
static void
603
write_buffer(Buffer buffer, bool unpin)
604
{
605
	BufferDesc *bufHdr;
606

607 608 609
	if (!BufferIsValid(buffer))
		elog(ERROR, "bad buffer id: %d", buffer);

610
	if (BufferIsLocal(buffer))
B
Bruce Momjian 已提交
611
	{
612
		WriteLocalBuffer(buffer, unpin);
B
Bruce Momjian 已提交
613 614
		return;
	}
615

616
	bufHdr = &BufferDescriptors[buffer - 1];
617

618 619
	Assert(PrivateRefCount[buffer - 1] > 0);

620 621
	LockBufHdr(bufHdr);

622
	Assert(bufHdr->refcount > 0);
V
Vadim B. Mikheev 已提交
623

J
Jan Wieck 已提交
624
	/*
625
	 * If the buffer was not dirty already, do vacuum cost accounting.
J
Jan Wieck 已提交
626 627 628 629
	 */
	if (!(bufHdr->flags & BM_DIRTY) && VacuumCostActive)
		VacuumCostBalance += VacuumCostPageDirty;

630
	bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
V
Vadim B. Mikheev 已提交
631

632 633 634 635
	UnlockBufHdr(bufHdr);

	if (unpin)
		UnpinBuffer(bufHdr, true, true);
B
Bruce Momjian 已提交
636 637 638 639 640 641 642
}

/*
 * WriteBuffer
 *
 *		Marks buffer contents as dirty (actual write happens later).
 *
643
 * Assume that buffer is pinned.  Assume that reln is valid.
B
Bruce Momjian 已提交
644 645 646 647 648 649 650 651
 *
 * Side Effects:
 *		Pin count is decremented.
 */
void
WriteBuffer(Buffer buffer)
{
	write_buffer(buffer, true);
652
}
653 654 655

/*
 * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer
656
 *						   when the operation is complete.
657
 */
B
Bruce Momjian 已提交
658
void
659 660
WriteNoReleaseBuffer(Buffer buffer)
{
B
Bruce Momjian 已提交
661
	write_buffer(buffer, false);
662 663 664 665 666
}

/*
 * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
 *
667 668 669 670 671
 * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
 * compared to calling the two routines separately.  Now it's mainly just
 * a convenience function.  However, if the passed buffer is valid and
 * already contains the desired block, we just return it as-is; and that
 * does save considerable work compared to a full release and reacquire.
672
 *
673
 * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
674 675
 * buffer actually needs to be released.  This case is the same as ReadBuffer,
 * but can save some tests in the caller.
676 677 678
 */
Buffer
ReleaseAndReadBuffer(Buffer buffer,
679
					 Relation relation,
680
					 BlockNumber blockNum)
681
{
682
	BufferDesc *bufHdr;
683

684
	if (BufferIsValid(buffer))
685
	{
686 687 688
		if (BufferIsLocal(buffer))
		{
			Assert(LocalRefCount[-buffer - 1] > 0);
689 690 691 692
			bufHdr = &LocalBufferDescriptors[-buffer - 1];
			if (bufHdr->tag.blockNum == blockNum &&
				RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
				return buffer;
693
			ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
694
			LocalRefCount[-buffer - 1]--;
695 696 697
			if (LocalRefCount[-buffer - 1] == 0 &&
				bufHdr->usage_count < BM_MAX_USAGE_COUNT)
				bufHdr->usage_count++;
698 699
		}
		else
700 701
		{
			Assert(PrivateRefCount[buffer - 1] > 0);
702
			bufHdr = &BufferDescriptors[buffer - 1];
703
			/* we have pin, so it's ok to examine tag without spinlock */
704 705 706
			if (bufHdr->tag.blockNum == blockNum &&
				RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
				return buffer;
707
			UnpinBuffer(bufHdr, true, true);
708
		}
709 710
	}

711
	return ReadBuffer(relation, blockNum);
712 713
}

714
/*
715
 * PinBuffer -- make buffer unavailable for replacement.
716
 *
717
 * This should be applied only to shared buffers, never local ones.
718 719
 *
 * Note that ResourceOwnerEnlargeBuffers must have been done already.
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
 *
 * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
 * some callers to avoid an extra spinlock cycle.
 */
static bool
PinBuffer(BufferDesc *buf)
{
	int			b = buf->buf_id;
	bool		result;

	if (PrivateRefCount[b] == 0)
	{
		/*
		 * Use NoHoldoff here because we don't want the unlock to be a
		 * potential place to honor a QueryCancel request.
		 * (The caller should be holding off interrupts anyway.)
		 */
		LockBufHdr_NoHoldoff(buf);
		buf->refcount++;
		result = (buf->flags & BM_VALID) != 0;
		UnlockBufHdr_NoHoldoff(buf);
	}
	else
	{
		/* If we previously pinned the buffer, it must surely be valid */
		result = true;
	}
	PrivateRefCount[b]++;
	Assert(PrivateRefCount[b] > 0);
	ResourceOwnerRememberBuffer(CurrentResourceOwner,
								BufferDescriptorGetBuffer(buf));
	return result;
}

/*
 * PinBuffer_Locked -- as above, but caller already locked the buffer header.
 * The spinlock is released before return.
 *
 * Note: use of this routine is frequently mandatory, not just an optimization
 * to save a spin lock/unlock cycle, because we need to pin a buffer before
 * its state can change under us.
761 762
 */
static void
763
PinBuffer_Locked(BufferDesc *buf)
764
{
765
	int			b = buf->buf_id;
766 767 768

	if (PrivateRefCount[b] == 0)
		buf->refcount++;
769 770
	/* NoHoldoff since we mustn't accept cancel interrupt here */
	UnlockBufHdr_NoHoldoff(buf);
771 772
	PrivateRefCount[b]++;
	Assert(PrivateRefCount[b] > 0);
773 774 775 776
	ResourceOwnerRememberBuffer(CurrentResourceOwner,
								BufferDescriptorGetBuffer(buf));
	/* Now we can accept cancel */
	RESUME_INTERRUPTS();
777 778 779 780 781 782
}

/*
 * UnpinBuffer -- make buffer available for replacement.
 *
 * This should be applied only to shared buffers, never local ones.
783 784
 *
 * Most but not all callers want CurrentResourceOwner to be adjusted.
785 786 787
 *
 * If we are releasing a buffer during VACUUM, and it's not been otherwise
 * used recently, and trashOK is true, send the buffer to the freelist.
788 789
 */
static void
790
UnpinBuffer(BufferDesc *buf, bool fixOwner, bool trashOK)
791
{
792
	int			b = buf->buf_id;
793

794 795 796 797
	if (fixOwner)
		ResourceOwnerForgetBuffer(CurrentResourceOwner,
								  BufferDescriptorGetBuffer(buf));

798 799 800
	Assert(PrivateRefCount[b] > 0);
	PrivateRefCount[b]--;
	if (PrivateRefCount[b] == 0)
801
	{
802 803
		bool	trash_buffer = false;

804
		/* I'd better not still hold any locks on the buffer */
805
		Assert(!LWLockHeldByMe(buf->content_lock));
806
		Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
807

808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829
		/* NoHoldoff ensures we don't lose control before sending signal */
		LockBufHdr_NoHoldoff(buf);

		/* Decrement the shared reference count */
		Assert(buf->refcount > 0);
		buf->refcount--;

		/* Mark the buffer recently used, unless we are in VACUUM */
		if (!strategy_hint_vacuum)
		{
			if (buf->usage_count < BM_MAX_USAGE_COUNT)
				buf->usage_count++;
		}
		else if (trashOK && 
				 buf->refcount == 0 &&
				 buf->usage_count == 0)
			trash_buffer = true;

		if ((buf->flags & BM_PIN_COUNT_WAITER) &&
			buf->refcount == 1)
		{
			/* we just released the last pin other than the waiter's */
830
			int		wait_backend_pid = buf->wait_backend_pid;
831 832 833

			buf->flags &= ~BM_PIN_COUNT_WAITER;
			UnlockBufHdr_NoHoldoff(buf);
834
			ProcSendSignal(wait_backend_pid);
835 836 837 838 839 840 841 842 843 844 845
		}
		else
			UnlockBufHdr_NoHoldoff(buf);

		/*
		 * If VACUUM is releasing an otherwise-unused buffer, send it to
		 * the freelist for near-term reuse.  We put it at the tail so that
		 * it won't be used before any invalid buffers that may exist.
		 */
		if (trash_buffer)
			StrategyFreeBuffer(buf, false);
846 847 848 849
	}
}

/*
850
 * BufferSync -- Write out all dirty buffers in the pool.
851
 *
852
 * This is called at checkpoint time to write out all dirty shared buffers.
853
 */
854 855
void
BufferSync(void)
856
{
857 858
	int			buf_id;
	int			num_to_scan;
859

J
Jan Wieck 已提交
860
	/*
861
	 * Find out where to start the circular scan.
J
Jan Wieck 已提交
862
	 */
863
	buf_id = StrategySyncStart();
864

865 866
	/* Make sure we can handle the pin inside SyncOneBuffer */
	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
J
Jan Wieck 已提交
867

J
Jan Wieck 已提交
868
	/*
869
	 * Loop over all buffers.
J
Jan Wieck 已提交
870
	 */
871 872
	num_to_scan = NBuffers;
	while (num_to_scan-- > 0)
J
Jan Wieck 已提交
873
	{
874 875 876
		(void) SyncOneBuffer(buf_id, false);
		if (++buf_id >= NBuffers)
			buf_id = 0;
J
Jan Wieck 已提交
877
	}
878
}
879

880 881 882 883 884 885 886 887 888 889 890 891 892 893
/*
 * BgBufferSync -- Write out some dirty buffers in the pool.
 *
 * This is called periodically by the background writer process.
 */
void
BgBufferSync(void)
{
	static int	buf_id1 = 0;
	int			buf_id2;
	int			num_to_scan;
	int			num_written;

	/* Make sure we can handle the pin inside SyncOneBuffer */
894 895
	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

896
	/*
897 898 899 900 901 902
	 * To minimize work at checkpoint time, we want to try to keep all the
	 * buffers clean; this motivates a scan that proceeds sequentially through
	 * all buffers.  But we are also charged with ensuring that buffers that
	 * will be recycled soon are clean when needed; these buffers are the
	 * ones just ahead of the StrategySyncStart point.  We make a separate
	 * scan through those.
903
	 */
J
Jan Wieck 已提交
904

905 906 907
	/*
	 * This loop runs over all buffers, including pinned ones.  The
	 * starting point advances through the buffer pool on successive calls.
908 909 910 911 912
	 *
	 * Note that we advance the static counter *before* trying to write.
	 * This ensures that, if we have a persistent write failure on a dirty
	 * buffer, we'll still be able to make progress writing other buffers.
	 * (The bgwriter will catch the error and just call us again later.)
913 914 915 916 917
	 */
	if (bgwriter_all_percent > 0.0 && bgwriter_all_maxpages > 0)
	{
		num_to_scan = (int) ((NBuffers * bgwriter_all_percent + 99) / 100);
		num_written = 0;
918

919
		while (num_to_scan-- > 0)
920
		{
921 922
			if (++buf_id1 >= NBuffers)
				buf_id1 = 0;
923 924 925 926 927
			if (SyncOneBuffer(buf_id1, false))
			{
				if (++num_written >= bgwriter_all_maxpages)
					break;
			}
928
		}
929 930
	}

931 932 933 934 935 936 937 938
	/*
	 * This loop considers only unpinned buffers close to the clock sweep
	 * point.
	 */
	if (bgwriter_lru_percent > 0.0 && bgwriter_lru_maxpages > 0)
	{
		num_to_scan = (int) ((NBuffers * bgwriter_lru_percent + 99) / 100);
		num_written = 0;
J
Jan Wieck 已提交
939

940
		buf_id2 = StrategySyncStart();
941

942 943 944
		while (num_to_scan-- > 0)
		{
			if (SyncOneBuffer(buf_id2, true))
945 946 947 948
			{
				if (++num_written >= bgwriter_lru_maxpages)
					break;
			}
949 950 951 952
			if (++buf_id2 >= NBuffers)
				buf_id2 = 0;
		}
	}
V
Vadim B. Mikheev 已提交
953
}
954 955

/*
956 957 958 959
 * SyncOneBuffer -- process a single buffer during syncing.
 *
 * If skip_pinned is true, we don't write currently-pinned buffers, nor
 * buffers marked recently used, as these are not replacement candidates.
960
 *
961 962 963 964 965
 * Returns true if buffer was written, else false.  (This could be in error
 * if FlushBuffers finds the buffer clean after locking it, but we don't
 * care all that much.)
 *
 * Note: caller must have done ResourceOwnerEnlargeBuffers.
966
 */
967 968
static bool
SyncOneBuffer(int buf_id, bool skip_pinned)
969
{
970 971
	BufferDesc *bufHdr = &BufferDescriptors[buf_id];

H
 
Hiroshi Inoue 已提交
972
	/*
973
	 * Check whether buffer needs writing.
974
	 *
975 976 977 978 979 980
	 * We can make this check without taking the buffer content lock
	 * so long as we mark pages dirty in access methods *before* logging
	 * changes with XLogInsert(): if someone marks the buffer dirty
	 * just after our check we don't worry because our checkpoint.redo
	 * points before log record for upcoming changes and so we are not
	 * required to write such dirty buffer.
H
 
Hiroshi Inoue 已提交
981
	 */
982 983
	LockBufHdr(bufHdr);
	if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
H
 
Hiroshi Inoue 已提交
984
	{
985 986
		UnlockBufHdr(bufHdr);
		return false;
H
 
Hiroshi Inoue 已提交
987
	}
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
	if (skip_pinned &&
		(bufHdr->refcount != 0 || bufHdr->usage_count != 0))
	{
		UnlockBufHdr(bufHdr);
		return false;
	}

	/*
	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing
	 * if the buffer is clean by the time we've locked it.)
	 */
	PinBuffer_Locked(bufHdr);
	LWLockAcquire(bufHdr->content_lock, LW_SHARED);

	FlushBuffer(bufHdr, NULL);

	LWLockRelease(bufHdr->content_lock);
	UnpinBuffer(bufHdr, true, false /* don't change freelist */ );

	return true;
1008 1009
}

1010

1011 1012 1013 1014 1015
/*
 * Return a palloc'd string containing buffer usage statistics.
 */
char *
ShowBufferUsage(void)
1016
{
1017
	StringInfoData str;
1018 1019
	float		hitrate;
	float		localhitrate;
1020

1021 1022
	initStringInfo(&str);

1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
	if (ReadBufferCount == 0)
		hitrate = 0.0;
	else
		hitrate = (float) BufferHitCount *100.0 / ReadBufferCount;

	if (ReadLocalBufferCount == 0)
		localhitrate = 0.0;
	else
		localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount;

1033
	appendStringInfo(&str,
B
Bruce Momjian 已提交
1034
					 "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
1035
			ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate);
1036
	appendStringInfo(&str,
B
Bruce Momjian 已提交
1037 1038
					 "!\tLocal  blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
					 ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate);
1039
	appendStringInfo(&str,
B
Bruce Momjian 已提交
1040 1041
					 "!\tDirect blocks: %10ld read, %10ld written\n",
					 NDirectFileRead, NDirectFileWrite);
1042 1043

	return str.data;
1044 1045 1046
}

void
1047
ResetBufferUsage(void)
1048
{
1049 1050 1051 1052 1053 1054 1055 1056
	BufferHitCount = 0;
	ReadBufferCount = 0;
	BufferFlushCount = 0;
	LocalBufferHitCount = 0;
	ReadLocalBufferCount = 0;
	LocalBufferFlushCount = 0;
	NDirectFileRead = 0;
	NDirectFileWrite = 0;
1057 1058
}

1059 1060
/*
 *		AtEOXact_Buffers - clean up at end of transaction.
1061
 *
1062 1063 1064
 *		As of PostgreSQL 8.0, buffer pins should get released by the
 *		ResourceOwner mechanism.  This routine is just a debugging
 *		cross-check that no pins remain.
1065 1066
 */
void
1067
AtEOXact_Buffers(bool isCommit)
1068
{
1069
#ifdef USE_ASSERT_CHECKING
1070
	if (assert_enabled)
1071
	{
1072 1073 1074 1075 1076 1077
		int			i;

		for (i = 0; i < NBuffers; i++)
		{
			Assert(PrivateRefCount[i] == 0);
		}
1078
	}
1079
#endif
1080 1081

	AtEOXact_LocalBuffers(isCommit);
1082 1083 1084

	/* Make sure we reset the strategy hint in case VACUUM errored out */
	StrategyHintVacuum(false);
1085 1086 1087
}

/*
1088 1089 1090 1091 1092 1093 1094
 * InitBufferPoolBackend --- second-stage initialization of a new backend
 *
 * This is called after we have acquired a PGPROC and so can safely get
 * LWLocks.  We don't currently need to do anything at this stage ...
 * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
 * access, and thereby has to be called at the corresponding phase of
 * backend shutdown.
1095 1096
 */
void
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
InitBufferPoolBackend(void)
{
	on_shmem_exit(AtProcExit_Buffers, 0);
}

/*
 * Ensure we have released all shared-buffer locks and pins during backend exit
 */
static void
AtProcExit_Buffers(int code, Datum arg)
1107 1108 1109 1110 1111 1112
{
	int			i;

	AbortBufferIO();
	UnlockBuffers();

1113
	for (i = 0; i < NBuffers; i++)
1114
	{
1115
		if (PrivateRefCount[i] != 0)
1116 1117 1118 1119
		{
			BufferDesc *buf = &(BufferDescriptors[i]);

			/*
1120 1121
			 * We don't worry about updating ResourceOwner; if we even got
			 * here, it suggests that ResourceOwners are messed up.
1122 1123
			 */
			PrivateRefCount[i] = 1;		/* make sure we release shared pin */
1124
			UnpinBuffer(buf, false, false /* don't change freelist */ );
1125 1126
			Assert(PrivateRefCount[i] == 0);
		}
1127
	}
1128 1129 1130

	/* localbuf.c needs a chance too */
	AtProcExit_LocalBuffers();
1131
}
1132

1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
/*
 * Helper routine to issue warnings when a buffer is unexpectedly pinned
 */
void
PrintBufferLeakWarning(Buffer buffer)
{
	BufferDesc *buf;
	int32		loccount;

	Assert(BufferIsValid(buffer));
	if (BufferIsLocal(buffer))
	{
		buf = &LocalBufferDescriptors[-buffer - 1];
		loccount = LocalRefCount[-buffer - 1];
	}
	else
	{
		buf = &BufferDescriptors[buffer - 1];
		loccount = PrivateRefCount[buffer - 1];
	}

1154
	/* theoretically we should lock the bufhdr here */
1155 1156 1157 1158 1159 1160 1161 1162
	elog(WARNING,
		 "buffer refcount leak: [%03d] "
		 "(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
		 buffer,
		 buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
		 buf->tag.rnode.relNode,
		 buf->tag.blockNum, buf->flags,
		 buf->refcount, loccount);
1163 1164
}

1165
/*
V
Vadim B. Mikheev 已提交
1166
 * FlushBufferPool
1167
 *
1168 1169 1170
 * Flush all dirty blocks in buffer pool to disk at the checkpoint time.
 * Local relations do not participate in checkpoints, so they don't need to be
 * flushed.
1171 1172
 */
void
V
Vadim B. Mikheev 已提交
1173
FlushBufferPool(void)
1174
{
1175
	BufferSync();
V
Vadim B. Mikheev 已提交
1176 1177 1178
	smgrsync();
}

J
Jan Wieck 已提交
1179

V
Vadim B. Mikheev 已提交
1180
/*
1181
 * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
V
Vadim B. Mikheev 已提交
1182 1183 1184 1185
 */
void
BufmgrCommit(void)
{
1186
	/* Nothing to do in bufmgr anymore... */
B
Bruce Momjian 已提交
1187

V
Vadim B. Mikheev 已提交
1188
	smgrcommit();
1189 1190 1191
}

/*
B
Bruce Momjian 已提交
1192
 * BufferGetBlockNumber
1193
 *		Returns the block number associated with a buffer.
1194 1195
 *
 * Note:
1196 1197
 *		Assumes that the buffer is valid and pinned, else the
 *		value may be obsolete immediately...
1198 1199 1200 1201
 */
BlockNumber
BufferGetBlockNumber(Buffer buffer)
{
1202 1203
	BufferDesc *bufHdr;

1204
	Assert(BufferIsPinned(buffer));
1205

1206
	if (BufferIsLocal(buffer))
1207
		bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
1208
	else
1209 1210 1211 1212
		bufHdr = &BufferDescriptors[buffer - 1];

	/* pinned, so OK to read tag without spinlock */
	return bufHdr->tag.blockNum;
1213 1214
}

1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231
/*
 * BufferGetFileNode
 *		Returns the relation ID (RelFileNode) associated with a buffer.
 *
 * This should make the same checks as BufferGetBlockNumber, but since the
 * two are generally called together, we don't bother.
 */
RelFileNode
BufferGetFileNode(Buffer buffer)
{
	BufferDesc *bufHdr;

	if (BufferIsLocal(buffer))
		bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
	else
		bufHdr = &BufferDescriptors[buffer - 1];

1232
	return bufHdr->tag.rnode;
1233 1234
}

1235
/*
1236 1237
 * FlushBuffer
 *		Physically write out a shared buffer.
1238
 *
1239 1240 1241
 * NOTE: this actually just passes the buffer contents to the kernel; the
 * real write to disk won't happen until the kernel feels like it.  This
 * is okay from our point of view since we can redo the changes from WAL.
1242 1243
 * However, we will need to force the changes to disk via fsync before
 * we can checkpoint WAL.
1244
 *
1245 1246 1247 1248 1249
 * The caller must hold a pin on the buffer and have share-locked the
 * buffer contents.  (Note: a share-lock does not prevent updates of
 * hint bits in the buffer, so the page could change while the write
 * is in progress, but we assume that that will not invalidate the data
 * written.)
1250 1251
 *
 * If the caller has an smgr reference for the buffer's relation, pass it
1252
 * as the second parameter.  If not, pass NULL.
1253
 */
1254
static void
1255
FlushBuffer(BufferDesc *buf, SMgrRelation reln)
1256
{
V
Vadim B. Mikheev 已提交
1257
	XLogRecPtr	recptr;
1258
	ErrorContextCallback errcontext;
1259

1260
	/*
1261 1262 1263
	 * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
	 * false, then someone else flushed the buffer before we could, so
	 * we need not do anything.
1264
	 */
1265 1266
	if (!StartBufferIO(buf, false))
		return;
1267

1268 1269
	/* Setup error traceback support for ereport() */
	errcontext.callback = buffer_write_error_callback;
1270
	errcontext.arg = buf;
1271 1272 1273
	errcontext.previous = error_context_stack;
	error_context_stack = &errcontext;

1274
	/* Find smgr relation for buffer */
1275 1276 1277
	if (reln == NULL)
		reln = smgropen(buf->tag.rnode);

1278
	/*
1279
	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
1280 1281 1282 1283 1284
	 * rule that log updates must hit disk before any of the data-file
	 * changes they describe do.
	 */
	recptr = BufferGetLSN(buf);
	XLogFlush(recptr);
1285

1286
	/*
B
Bruce Momjian 已提交
1287
	 * Now it's safe to write buffer to disk. Note that no one else should
1288 1289
	 * have been able to write it while we were busy with log flushing
	 * because we have the io_in_progress lock.
1290
	 */
1291 1292 1293 1294 1295 1296

	/* To check if block content changes while flushing. - vadim 01/17/97 */
	LockBufHdr_NoHoldoff(buf);
	buf->flags &= ~BM_JUST_DIRTIED;
	UnlockBufHdr_NoHoldoff(buf);

1297
	smgrwrite(reln,
1298
			  buf->tag.blockNum,
1299
			  (char *) BufHdrGetBlock(buf),
1300
			  false);
1301 1302

	BufferFlushCount++;
1303 1304

	/*
1305 1306
	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set)
	 * and end the io_in_progress state.
1307
	 */
1308 1309 1310 1311
	TerminateBufferIO(buf, true, 0);

	/* Pop the error context stack */
	error_context_stack = errcontext.previous;
1312 1313 1314
}

/*
B
Bruce Momjian 已提交
1315
 * RelationGetNumberOfBlocks
1316
 *		Determines the current number of pages in the relation.
1317 1318 1319 1320
 */
BlockNumber
RelationGetNumberOfBlocks(Relation relation)
{
1321
	/* Open it at the smgr level if not already done */
1322
	RelationOpenSmgr(relation);
1323

1324
	return smgrnblocks(relation->rd_smgr);
1325 1326
}

1327
/*
1328 1329
 * RelationTruncate
 *		Physically truncate a relation to the specified number of blocks.
1330
 *
1331 1332
 * As of Postgres 8.1, this includes getting rid of any buffers for the
 * blocks that are to be dropped; previously, callers had to do that.
1333 1334
 */
void
1335
RelationTruncate(Relation rel, BlockNumber nblocks)
1336
{
1337
	/* Open it at the smgr level if not already done */
1338
	RelationOpenSmgr(rel);
1339

1340 1341 1342 1343
	/* Make sure rd_targblock isn't pointing somewhere past end */
	rel->rd_targblock = InvalidBlockNumber;

	/* Do the real work */
1344
	smgrtruncate(rel->rd_smgr, nblocks, rel->rd_istemp);
1345 1346
}

1347 1348 1349
/* ---------------------------------------------------------------------
 *		DropRelFileNodeBuffers
 *
1350 1351 1352 1353 1354 1355
 *		This function removes from the buffer pool all the pages of the
 *		specified relation that have block numbers >= firstDelBlock.
 *		(In particular, with firstDelBlock = 0, all pages are removed.)
 *		Dirty pages are simply dropped, without bothering to write them
 *		out first.  Therefore, this is NOT rollback-able, and so should be
 *		used only with extreme caution!
1356
 *
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
 *		Currently, this is called only from smgr.c when the underlying file
 *		is about to be deleted or truncated (firstDelBlock is needed for
 *		the truncation case).  The data in the affected pages would therefore
 *		be deleted momentarily anyway, and there is no point in writing it.
 *		It is the responsibility of higher-level code to ensure that the
 *		deletion or truncation does not lose any data that could be needed
 *		later.  It is also the responsibility of higher-level code to ensure
 *		that no other process could be trying to load more pages of the
 *		relation into buffers.
 *
 *		XXX currently it sequentially searches the buffer pool, should be
 *		changed to more clever ways of searching.  However, this routine
 *		is used only in code paths that aren't very performance-critical,
 *		and we shouldn't slow down the hot paths to make it faster ...
1371 1372 1373
 * --------------------------------------------------------------------
 */
void
1374 1375
DropRelFileNodeBuffers(RelFileNode rnode, bool istemp,
					   BlockNumber firstDelBlock)
1376
{
1377
	int			i;
1378
	BufferDesc *bufHdr;
1379

1380
	if (istemp)
1381 1382 1383
	{
		for (i = 0; i < NLocBuffer; i++)
		{
1384
			bufHdr = &LocalBufferDescriptors[i];
1385 1386
			if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
				bufHdr->tag.blockNum >= firstDelBlock)
1387
			{
1388
				if (LocalRefCount[i] != 0)
1389
					elog(ERROR, "block %u of %u/%u/%u is still referenced (local %u)",
1390 1391 1392 1393 1394
						 bufHdr->tag.blockNum,
						 bufHdr->tag.rnode.spcNode,
						 bufHdr->tag.rnode.dbNode,
						 bufHdr->tag.rnode.relNode,
						 LocalRefCount[i]);
1395 1396 1397
				CLEAR_BUFFERTAG(bufHdr->tag);
				bufHdr->flags = 0;
				bufHdr->usage_count = 0;
1398
			}
1399
		}
1400
		return;
1401
	}
1402

1403
	for (i = 0; i < NBuffers; i++)
1404
	{
1405 1406
		bufHdr = &BufferDescriptors[i];
		LockBufHdr(bufHdr);
1407 1408
		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
			bufHdr->tag.blockNum >= firstDelBlock)
1409 1410 1411
			InvalidateBuffer(bufHdr);		/* releases spinlock */
		else
			UnlockBufHdr(bufHdr);
1412 1413 1414
	}
}

1415
/* ---------------------------------------------------------------------
1416
 *		DropBuffers
1417
 *
1418 1419
 *		This function removes all the buffers in the buffer cache for a
 *		particular database.  Dirty pages are simply dropped, without
B
Bruce Momjian 已提交
1420
 *		bothering to write them out first.	This is used when we destroy a
1421
 *		database, to avoid trying to flush data to disk when the directory
1422
 *		tree no longer exists.	Implementation is pretty similar to
1423
 *		DropRelFileNodeBuffers() which is for destroying just one relation.
1424 1425 1426 1427 1428
 * --------------------------------------------------------------------
 */
void
DropBuffers(Oid dbid)
{
1429
	int			i;
1430
	BufferDesc *bufHdr;
1431

1432 1433 1434 1435
	/*
	 * We needn't consider local buffers, since by assumption the target
	 * database isn't our own.
	 */
1436

1437
	for (i = 0; i < NBuffers; i++)
1438
	{
1439 1440
		bufHdr = &BufferDescriptors[i];
		LockBufHdr(bufHdr);
1441
		if (bufHdr->tag.rnode.dbNode == dbid)
1442 1443 1444
			InvalidateBuffer(bufHdr);		/* releases spinlock */
		else
			UnlockBufHdr(bufHdr);
1445
	}
1446 1447 1448
}

/* -----------------------------------------------------------------
1449
 *		PrintBufferDescs
1450
 *
1451 1452
 *		this function prints all the buffer descriptors, for debugging
 *		use only.
1453 1454
 * -----------------------------------------------------------------
 */
1455
#ifdef NOT_USED
1456
void
1457
PrintBufferDescs(void)
1458
{
1459 1460
	int			i;
	BufferDesc *buf = BufferDescriptors;
1461

1462
	for (i = 0; i < NBuffers; ++i, ++buf)
1463
	{
1464 1465 1466 1467 1468 1469 1470 1471 1472
		/* theoretically we should lock the bufhdr here */
		elog(LOG,
			 "[%02d] (freeNext=%d, rel=%u/%u/%u, "
			 "blockNum=%u, flags=0x%x, refcount=%u %d)",
			 i, buf->freeNext,
			 buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
			 buf->tag.rnode.relNode,
			 buf->tag.blockNum, buf->flags,
			 buf->refcount, PrivateRefCount[i]);
1473 1474
	}
}
1475
#endif
1476

1477
#ifdef NOT_USED
1478
void
1479
PrintPinnedBufs(void)
1480
{
1481 1482
	int			i;
	BufferDesc *buf = BufferDescriptors;
1483 1484 1485 1486

	for (i = 0; i < NBuffers; ++i, ++buf)
	{
		if (PrivateRefCount[i] > 0)
1487 1488 1489 1490
		{
			/* theoretically we should lock the bufhdr here */
			elog(LOG,
				 "[%02d] (freeNext=%d, rel=%u/%u/%u, "
1491
				 "blockNum=%u, flags=0x%x, refcount=%u %d)",
1492
				 i, buf->freeNext,
1493 1494
				 buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
				 buf->tag.rnode.relNode,
1495 1496
				 buf->tag.blockNum, buf->flags,
				 buf->refcount, PrivateRefCount[i]);
1497
		}
1498
	}
1499
}
1500
#endif
1501

1502
/* ---------------------------------------------------------------------
1503 1504
 *		FlushRelationBuffers
 *
1505 1506 1507
 *		This function writes all dirty pages of a relation out to disk
 *		(or more accurately, out to kernel disk buffers), ensuring that the
 *		kernel has an up-to-date view of the relation.
1508
 *
1509 1510 1511 1512
 *		Generally, the caller should be holding AccessExclusiveLock on the
 *		target relation to ensure that no other backend is busy dirtying
 *		more blocks of the relation; the effects can't be expected to last
 *		after the lock is released.
1513
 *
1514
 *		XXX currently it sequentially searches the buffer pool, should be
1515 1516 1517 1518
 *		changed to more clever ways of searching.  This routine is not
 *		used in any performance-critical code paths, so it's not worth
 *		adding additional overhead to normal paths to make it go faster;
 *		but see also DropRelFileNodeBuffers.
1519 1520
 * --------------------------------------------------------------------
 */
1521
void
1522
FlushRelationBuffers(Relation rel)
1523
{
1524
	int			i;
1525
	BufferDesc *bufHdr;
1526

1527
	/* Open rel at the smgr level if not already done */
1528
	RelationOpenSmgr(rel);
1529

1530
	if (rel->rd_istemp)
1531 1532 1533
	{
		for (i = 0; i < NLocBuffer; i++)
		{
1534
			bufHdr = &LocalBufferDescriptors[i];
1535 1536
			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
				(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
1537
			{
1538
				ErrorContextCallback errcontext;
1539

1540 1541 1542 1543 1544
				/* Setup error traceback support for ereport() */
				errcontext.callback = buffer_write_error_callback;
				errcontext.arg = bufHdr;
				errcontext.previous = error_context_stack;
				error_context_stack = &errcontext;
1545

1546 1547 1548 1549
				smgrwrite(rel->rd_smgr,
						  bufHdr->tag.blockNum,
						  (char *) LocalBufHdrGetBlock(bufHdr),
						  true);
1550

1551
				bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
1552

1553 1554
				/* Pop the error context stack */
				error_context_stack = errcontext.previous;
1555 1556
			}
		}
1557

1558
		return;
1559 1560
	}

1561 1562 1563
	/* Make sure we can handle the pin inside the loop */
	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

1564 1565
	for (i = 0; i < NBuffers; i++)
	{
1566
		bufHdr = &BufferDescriptors[i];
1567
		LockBufHdr(bufHdr);
1568 1569
		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
			(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
1570
		{
1571 1572 1573 1574 1575
			PinBuffer_Locked(bufHdr);
			LWLockAcquire(bufHdr->content_lock, LW_SHARED);
			FlushBuffer(bufHdr, rel->rd_smgr);
			LWLockRelease(bufHdr->content_lock);
			UnpinBuffer(bufHdr, true, false /* no freelist change */ );
1576
		}
1577 1578
		else
			UnlockBufHdr(bufHdr);
1579 1580 1581
	}
}

1582
/*
1583 1584
 * ReleaseBuffer -- remove the pin on a buffer without
 *		marking it dirty.
1585
 */
1586
void
1587
ReleaseBuffer(Buffer buffer)
1588
{
1589
	BufferDesc *bufHdr;
1590

1591 1592 1593
	if (!BufferIsValid(buffer))
		elog(ERROR, "bad buffer id: %d", buffer);

1594 1595
	ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);

1596 1597 1598
	if (BufferIsLocal(buffer))
	{
		Assert(LocalRefCount[-buffer - 1] > 0);
1599
		bufHdr = &LocalBufferDescriptors[-buffer - 1];
1600
		LocalRefCount[-buffer - 1]--;
1601 1602 1603
		if (LocalRefCount[-buffer - 1] == 0 &&
			bufHdr->usage_count < BM_MAX_USAGE_COUNT)
			bufHdr->usage_count++;
1604
		return;
1605
	}
1606 1607 1608 1609

	bufHdr = &BufferDescriptors[buffer - 1];

	Assert(PrivateRefCount[buffer - 1] > 0);
1610

1611 1612 1613
	if (PrivateRefCount[buffer - 1] > 1)
		PrivateRefCount[buffer - 1]--;
	else
1614
		UnpinBuffer(bufHdr, false, true);
1615 1616
}

1617 1618 1619 1620 1621 1622
/*
 * IncrBufferRefCount
 *		Increment the pin count on a buffer that we have *already* pinned
 *		at least once.
 *
 *		This function cannot be used on a buffer we do not have pinned,
1623
 *		because it doesn't change the shared buffer state.
1624 1625 1626 1627
 */
void
IncrBufferRefCount(Buffer buffer)
{
1628
	Assert(BufferIsPinned(buffer));
1629 1630 1631 1632 1633 1634 1635 1636
	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
	ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
	if (BufferIsLocal(buffer))
		LocalRefCount[-buffer - 1]++;
	else
		PrivateRefCount[buffer - 1]++;
}

1637 1638 1639 1640 1641
/*
 * SetBufferCommitInfoNeedsSave
 *
 *	Mark a buffer dirty when we have updated tuple commit-status bits in it.
 *
1642 1643 1644 1645 1646 1647
 * This is essentially the same as WriteNoReleaseBuffer.  We preserve the
 * distinction as a way of documenting that the caller has not made a critical
 * data change --- the status-bit update could be redone by someone else just
 * as easily.  Therefore, no WAL log record need be generated, whereas calls
 * to WriteNoReleaseBuffer really ought to be associated with a WAL-entry-
 * creating action.
1648 1649 1650
 *
 * This routine might get called many times on the same page, if we are making
 * the first scan after commit of an xact that added/deleted many tuples.
1651
 * So, be as quick as we can if the buffer is already dirty.  We do this by
1652
 * not acquiring spinlock if it looks like the status bits are already OK.
1653 1654 1655
 * (Note it is okay if someone else clears BM_JUST_DIRTIED immediately after
 * we look, because the buffer content update is already done and will be
 * reflected in the I/O.)
1656
 */
1657 1658
void
SetBufferCommitInfoNeedsSave(Buffer buffer)
1659
{
1660 1661
	BufferDesc *bufHdr;

1662 1663 1664
	if (!BufferIsValid(buffer))
		elog(ERROR, "bad buffer id: %d", buffer);

1665
	if (BufferIsLocal(buffer))
1666 1667
	{
		WriteLocalBuffer(buffer, false);
1668
		return;
1669
	}
1670 1671 1672

	bufHdr = &BufferDescriptors[buffer - 1];

1673 1674
	Assert(PrivateRefCount[buffer - 1] > 0);

1675 1676 1677
	if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
		(BM_DIRTY | BM_JUST_DIRTIED))
	{
1678
		LockBufHdr(bufHdr);
1679 1680
		Assert(bufHdr->refcount > 0);
		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1681
		UnlockBufHdr(bufHdr);
1682
	}
1683
}
V
Vadim B. Mikheev 已提交
1684

1685
/*
1686
 * Release buffer content locks for shared buffers.
1687 1688
 *
 * Used to clean up after errors.
1689 1690
 *
 * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
1691
 * of releasing buffer content locks per se; the only thing we need to deal
1692
 * with here is clearing any PIN_COUNT request that was in progress.
1693
 */
V
Vadim B. Mikheev 已提交
1694
void
1695
UnlockBuffers(void)
V
Vadim B. Mikheev 已提交
1696
{
1697
	BufferDesc *buf = PinCountWaitBuf;
V
Vadim B. Mikheev 已提交
1698

1699
	if (buf)
V
Vadim B. Mikheev 已提交
1700
	{
1701
		HOLD_INTERRUPTS();		/* don't want to die() partway through... */
1702

1703
		LockBufHdr_NoHoldoff(buf);
1704

1705
		/*
1706 1707 1708
		 * Don't complain if flag bit not set; it could have been
		 * reset but we got a cancel/die interrupt before getting the
		 * signal.
1709
		 */
1710
		if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
1711
			buf->wait_backend_pid == MyProcPid)
1712
			buf->flags &= ~BM_PIN_COUNT_WAITER;
1713 1714

		UnlockBufHdr_NoHoldoff(buf);
1715

1716
		ProcCancelWaitForSignal();
1717

1718 1719
		PinCountWaitBuf = NULL;

1720
		RESUME_INTERRUPTS();
V
Vadim B. Mikheev 已提交
1721 1722 1723
	}
}

1724
/*
1725
 * Acquire or release the content_lock for the buffer.
1726
 */
V
Vadim B. Mikheev 已提交
1727
void
B
Bruce Momjian 已提交
1728
LockBuffer(Buffer buffer, int mode)
V
Vadim B. Mikheev 已提交
1729 1730 1731 1732 1733 1734 1735
{
	BufferDesc *buf;

	Assert(BufferIsValid(buffer));
	if (BufferIsLocal(buffer))
		return;

B
Bruce Momjian 已提交
1736
	buf = &(BufferDescriptors[buffer - 1]);
V
Vadim B. Mikheev 已提交
1737 1738

	if (mode == BUFFER_LOCK_UNLOCK)
1739
		LWLockRelease(buf->content_lock);
V
Vadim B. Mikheev 已提交
1740
	else if (mode == BUFFER_LOCK_SHARE)
1741
		LWLockAcquire(buf->content_lock, LW_SHARED);
V
Vadim B. Mikheev 已提交
1742 1743
	else if (mode == BUFFER_LOCK_EXCLUSIVE)
	{
1744
		LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
V
Vadim B. Mikheev 已提交
1745

1746
		/*
1747
		 * This is not the best place to mark buffer dirty (eg indices do
B
Bruce Momjian 已提交
1748
		 * not always change buffer they lock in excl mode). But please
1749 1750
		 * remember that it's critical to set dirty bit *before* logging
		 * changes with XLogInsert() - see comments in SyncOneBuffer().
1751
		 */
1752 1753 1754
		LockBufHdr_NoHoldoff(buf);
		buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
		UnlockBufHdr_NoHoldoff(buf);
V
Vadim B. Mikheev 已提交
1755 1756
	}
	else
1757
		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
V
Vadim B. Mikheev 已提交
1758
}
H
 
Hiroshi Inoue 已提交
1759

1760
/*
1761
 * Acquire the content_lock for the buffer, but only if we don't have to wait.
1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775
 *
 * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
 */
bool
ConditionalLockBuffer(Buffer buffer)
{
	BufferDesc *buf;

	Assert(BufferIsValid(buffer));
	if (BufferIsLocal(buffer))
		return true;			/* act as though we got it */

	buf = &(BufferDescriptors[buffer - 1]);

1776
	if (LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE))
1777 1778
	{
		/*
1779
		 * This is not the best place to mark buffer dirty (eg indices do
1780
		 * not always change buffer they lock in excl mode). But please
1781 1782
		 * remember that it's critical to set dirty bit *before* logging
		 * changes with XLogInsert() - see comments in SyncOneBuffer().
1783
		 */
1784 1785 1786
		LockBufHdr_NoHoldoff(buf);
		buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
		UnlockBufHdr_NoHoldoff(buf);
1787 1788 1789 1790 1791 1792

		return true;
	}
	return false;
}

1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814
/*
 * LockBufferForCleanup - lock a buffer in preparation for deleting items
 *
 * Items may be deleted from a disk page only when the caller (a) holds an
 * exclusive lock on the buffer and (b) has observed that no other backend
 * holds a pin on the buffer.  If there is a pin, then the other backend
 * might have a pointer into the buffer (for example, a heapscan reference
 * to an item --- see README for more details).  It's OK if a pin is added
 * after the cleanup starts, however; the newly-arrived backend will be
 * unable to look at the page until we release the exclusive lock.
 *
 * To implement this protocol, a would-be deleter must pin the buffer and
 * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
 * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
 * it has successfully observed pin count = 1.
 */
void
LockBufferForCleanup(Buffer buffer)
{
	BufferDesc *bufHdr;

	Assert(BufferIsValid(buffer));
1815
	Assert(PinCountWaitBuf == NULL);
1816 1817 1818 1819 1820

	if (BufferIsLocal(buffer))
	{
		/* There should be exactly one pin */
		if (LocalRefCount[-buffer - 1] != 1)
1821
			elog(ERROR, "incorrect local pin count: %d",
1822
				 LocalRefCount[-buffer - 1]);
1823 1824 1825 1826 1827 1828
		/* Nobody else to wait for */
		return;
	}

	/* There should be exactly one local pin */
	if (PrivateRefCount[buffer - 1] != 1)
1829
		elog(ERROR, "incorrect local pin count: %d",
1830
			 PrivateRefCount[buffer - 1]);
1831 1832 1833 1834 1835 1836 1837

	bufHdr = &BufferDescriptors[buffer - 1];

	for (;;)
	{
		/* Try to acquire lock */
		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1838
		LockBufHdr_NoHoldoff(bufHdr);
1839 1840 1841 1842
		Assert(bufHdr->refcount > 0);
		if (bufHdr->refcount == 1)
		{
			/* Successfully acquired exclusive lock with pincount 1 */
1843
			UnlockBufHdr_NoHoldoff(bufHdr);
1844 1845 1846 1847 1848
			return;
		}
		/* Failed, so mark myself as waiting for pincount 1 */
		if (bufHdr->flags & BM_PIN_COUNT_WAITER)
		{
1849
			UnlockBufHdr_NoHoldoff(bufHdr);
1850
			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1851
			elog(ERROR, "multiple backends attempting to wait for pincount 1");
1852
		}
1853
		bufHdr->wait_backend_pid = MyProcPid;
1854
		bufHdr->flags |= BM_PIN_COUNT_WAITER;
1855
		PinCountWaitBuf = bufHdr;
1856
		UnlockBufHdr_NoHoldoff(bufHdr);
1857 1858 1859
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		/* Wait to be signaled by UnpinBuffer() */
		ProcWaitForSignal();
1860
		PinCountWaitBuf = NULL;
1861 1862 1863 1864
		/* Loop back and try again */
	}
}

H
 
Hiroshi Inoue 已提交
1865
/*
1866
 *	Functions for buffer I/O handling
H
 
Hiroshi Inoue 已提交
1867
 *
1868
 *	Note: We assume that nested buffer I/O never occurs.
1869
 *	i.e at most one io_in_progress lock is held per proc.
1870 1871 1872 1873 1874 1875
 *
 *	Also note that these are used only for shared buffers, not local ones.
 */

/*
 * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
1876
 */
1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904
static void
WaitIO(BufferDesc *buf)
{
	/*
	 * Changed to wait until there's no IO - Inoue 01/13/2000
	 *
	 * Note this is *necessary* because an error abort in the process doing
	 * I/O could release the io_in_progress_lock prematurely. See
	 * AbortBufferIO.
	 */
	for (;;)
	{
		BufFlags	sv_flags;

		/*
		 * It may not be necessary to acquire the spinlock to check the
		 * flag here, but since this test is essential for correctness,
		 * we'd better play it safe.
		 */
		LockBufHdr(buf);
		sv_flags = buf->flags;
		UnlockBufHdr(buf);
		if (!(sv_flags & BM_IO_IN_PROGRESS))
			break;
		LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
		LWLockRelease(buf->io_in_progress_lock);
	}
}
H
 
Hiroshi Inoue 已提交
1905 1906

/*
1907
 * StartBufferIO: begin I/O on this buffer
H
 
Hiroshi Inoue 已提交
1908 1909 1910 1911
 *	(Assumptions)
 *	My process is executing no IO
 *	The buffer is Pinned
 *
1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922
 * In some scenarios there are race conditions in which multiple backends
 * could attempt the same I/O operation concurrently.  If someone else
 * has already started I/O on this buffer then we will block on the
 * io_in_progress lock until he's done.
 *
 * Input operations are only attempted on buffers that are not BM_VALID,
 * and output operations only on buffers that are BM_VALID and BM_DIRTY,
 * so we can always tell if the work is already done.
 *
 * Returns TRUE if we successfully marked the buffer as I/O busy,
 * FALSE if someone else already did the work.
1923
 */
1924
static bool
1925
StartBufferIO(BufferDesc *buf, bool forInput)
H
 
Hiroshi Inoue 已提交
1926 1927
{
	Assert(!InProgressBuf);
1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963

	for (;;)
	{
		/*
		 * Grab the io_in_progress lock so that other processes can wait for
		 * me to finish the I/O.
		 */
		LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

		/* NoHoldoff is OK since we now have an LWLock */
		LockBufHdr_NoHoldoff(buf);

		if (!(buf->flags & BM_IO_IN_PROGRESS))
			break;

		/*
		 * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
		 * lock isn't held is if the process doing the I/O is recovering from
		 * an error (see AbortBufferIO).  If that's the case, we must wait for
		 * him to get unwedged.
		 */
		UnlockBufHdr_NoHoldoff(buf);
		LWLockRelease(buf->io_in_progress_lock);
		WaitIO(buf);
	}

	/* Once we get here, there is definitely no I/O active on this buffer */

	if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
	{
		/* someone else already did the I/O */
		UnlockBufHdr_NoHoldoff(buf);
		LWLockRelease(buf->io_in_progress_lock);
		return false;
	}

H
 
Hiroshi Inoue 已提交
1964
	buf->flags |= BM_IO_IN_PROGRESS;
1965

1966
	UnlockBufHdr_NoHoldoff(buf);
1967

H
 
Hiroshi Inoue 已提交
1968 1969
	InProgressBuf = buf;
	IsForInput = forInput;
1970 1971

	return true;
H
 
Hiroshi Inoue 已提交
1972 1973 1974
}

/*
1975
 * TerminateBufferIO: release a buffer we were doing I/O on
H
 
Hiroshi Inoue 已提交
1976 1977
 *	(Assumptions)
 *	My process is executing IO for the buffer
1978 1979
 *	BM_IO_IN_PROGRESS bit is set for the buffer
 *	We hold the buffer's io_in_progress lock
H
 
Hiroshi Inoue 已提交
1980 1981
 *	The buffer is Pinned
 *
1982 1983 1984 1985
 * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
 * buffer's BM_DIRTY flag.  This is appropriate when terminating a
 * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
 * marking the buffer clean if it was re-dirtied while we were writing.
1986
 *
1987 1988 1989
 * set_flag_bits gets ORed into the buffer's flags.  It must include
 * BM_IO_ERROR in a failure case.  For successful completion it could
 * be 0, or BM_VALID if we just finished reading in the page.
1990
 */
1991
static void
1992
TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits)
H
 
Hiroshi Inoue 已提交
1993 1994
{
	Assert(buf == InProgressBuf);
1995 1996 1997 1998

	/* NoHoldoff is OK since we must have an LWLock */
	LockBufHdr_NoHoldoff(buf);

1999 2000
	Assert(buf->flags & BM_IO_IN_PROGRESS);
	buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
2001 2002 2003
	if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
		buf->flags &= ~BM_DIRTY;
	buf->flags |= set_flag_bits;
2004

2005
	UnlockBufHdr_NoHoldoff(buf);
2006

2007
	InProgressBuf = NULL;
H
 
Hiroshi Inoue 已提交
2008

2009
	LWLockRelease(buf->io_in_progress_lock);
H
 
Hiroshi Inoue 已提交
2010 2011 2012
}

/*
2013 2014 2015
 * AbortBufferIO: Clean up any active buffer I/O after an error.
 *
 *	All LWLocks we might have held have been released,
2016
 *	but we haven't yet released buffer pins, so the buffer is still pinned.
2017
 *
2018 2019
 *	If I/O was in progress, we always set BM_IO_ERROR, even though it's
 *	possible the error condition wasn't related to the I/O.
H
 
Hiroshi Inoue 已提交
2020
 */
2021 2022
void
AbortBufferIO(void)
H
 
Hiroshi Inoue 已提交
2023 2024
{
	BufferDesc *buf = InProgressBuf;
2025

H
 
Hiroshi Inoue 已提交
2026 2027
	if (buf)
	{
2028
		/*
2029 2030 2031 2032 2033
		 * Since LWLockReleaseAll has already been called, we're not
		 * holding the buffer's io_in_progress_lock. We have to re-acquire
		 * it so that we can use TerminateBufferIO. Anyone who's executing
		 * WaitIO on the buffer will be in a busy spin until we succeed in
		 * doing this.
2034 2035 2036
		 */
		LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

2037 2038
		/* NoHoldoff is OK since we now have an LWLock */
		LockBufHdr_NoHoldoff(buf);
2039
		Assert(buf->flags & BM_IO_IN_PROGRESS);
H
 
Hiroshi Inoue 已提交
2040
		if (IsForInput)
2041
		{
2042
			Assert(!(buf->flags & BM_DIRTY));
2043 2044
			/* We'd better not think buffer is valid yet */
			Assert(!(buf->flags & BM_VALID));
2045
			UnlockBufHdr_NoHoldoff(buf);
2046
		}
H
 
Hiroshi Inoue 已提交
2047 2048
		else
		{
2049 2050 2051 2052 2053
			BufFlags	sv_flags;

			sv_flags = buf->flags;
			Assert(sv_flags & BM_DIRTY);
			UnlockBufHdr_NoHoldoff(buf);
2054
			/* Issue notice if this is not the first failure... */
2055
			if (sv_flags & BM_IO_ERROR)
H
 
Hiroshi Inoue 已提交
2056
			{
2057
				/* Buffer is pinned, so we can read tag without spinlock */
2058 2059
				ereport(WARNING,
						(errcode(ERRCODE_IO_ERROR),
2060
						 errmsg("could not write block %u of %u/%u/%u",
2061
								buf->tag.blockNum,
2062 2063
								buf->tag.rnode.spcNode,
								buf->tag.rnode.dbNode,
2064 2065
								buf->tag.rnode.relNode),
						 errdetail("Multiple failures --- write error may be permanent.")));
H
 
Hiroshi Inoue 已提交
2066 2067
			}
		}
2068
		TerminateBufferIO(buf, false, BM_IO_ERROR);
H
 
Hiroshi Inoue 已提交
2069 2070
	}
}
2071

2072 2073 2074 2075 2076 2077 2078 2079
/*
 * Error context callback for errors occurring during buffer writes.
 */
static void
buffer_write_error_callback(void *arg)
{
	BufferDesc *bufHdr = (BufferDesc *) arg;

2080
	/* Buffer is pinned, so we can read the tag without locking the spinlock */
2081
	if (bufHdr != NULL)
2082
		errcontext("writing block %u of relation %u/%u/%u",
2083
				   bufHdr->tag.blockNum,
2084 2085 2086
				   bufHdr->tag.rnode.spcNode,
				   bufHdr->tag.rnode.dbNode,
				   bufHdr->tag.rnode.relNode);
2087
}