提交 fc929b37 编写于 作者: H Heikki Linnakangas

Refactor away duplicated code in ReadBuffer_Resync()

ReadBuffer_Resync() and BufferAlloc_Resync() were not materially different
from plain ReadBuffer() and BufferAlloc(), so just use the latter directly.

In the passing, also:
* rename BufferAlloc_Internal to BufferAlloc_common, like it's called in
  newer upstream versions.
* revert some spurious whitespace and comment differences vs. upstream.
上级 05cf1fb0
......@@ -25,6 +25,8 @@
#include "utils/faultinjector.h"
#include "utils/relcache.h"
#define OIDCHARS 10 /* max chars printed by %u */
static int FileRepPrimary_RunResyncWorker(void);
static int FileRepPrimary_ResyncWrite(FileRepResyncHashEntry_s *entry);
static int FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request);
......@@ -193,6 +195,7 @@ FileRepPrimary_ResyncWrite(FileRepResyncHashEntry_s *entry)
BlockNumber numBlocks;
BlockNumber blkno;
SMgrRelation smgr_relation;
char relidstr[OIDCHARS + 1 + OIDCHARS + 1 + OIDCHARS + 1];
XLogRecPtr loc;
int count = 0;
int thresholdCount = 0;
......@@ -212,14 +215,15 @@ FileRepPrimary_ResyncWrite(FileRepResyncHashEntry_s *entry)
numBlocks = smgrnblocks(smgr_relation);
if (Debug_filerep_print)
elog(LOG, "resync buffer pool relation '%u/%u/%u' "
"number of blocks '%d' ",
snprintf(relidstr, sizeof(relidstr), "%u/%u/%u",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
numBlocks);
smgr_relation->smgr_rnode.relNode);
if (Debug_filerep_print)
elog(LOG, "resync buffer pool relation '%s' number of blocks '%d' ",
relidstr, numBlocks);
thresholdCount = Min(numBlocks, 1024);
/*
......@@ -245,7 +249,7 @@ FileRepPrimary_ResyncWrite(FileRepResyncHashEntry_s *entry)
#endif
FileRepResync_SetReadBufferRequest();
buf = ReadBuffer_Resync(smgr_relation, blkno);
buf = ReadBuffer_Resync(smgr_relation, blkno, relidstr);
FileRepResync_ResetReadBufferRequest();
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
......@@ -256,11 +260,9 @@ FileRepPrimary_ResyncWrite(FileRepResyncHashEntry_s *entry)
if (Debug_filerep_print)
{
elog(LOG,
"full resync buffer pool identifier '%u/%u/%u' num blocks '%d' blkno '%d' lsn begin change tracking '%s(%u/%u)' "
"full resync buffer pool identifier '%s' num blocks '%d' blkno '%d' lsn begin change tracking '%s(%u/%u)' "
"lsn page '%s(%u/%u)' lsn end change tracking '%s(%u/%u)' ",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
relidstr,
numBlocks,
blkno,
XLogLocationToString(&entry->mirrorBufpoolResyncCkptLoc),
......@@ -278,10 +280,8 @@ FileRepPrimary_ResyncWrite(FileRepResyncHashEntry_s *entry)
char tmpBuf[FILEREP_MAX_LOG_DESCRIPTION_LEN];
snprintf(tmpBuf, sizeof(tmpBuf),
"full resync buffer pool identifier '%u/%u/%u' num blocks '%d' blkno '%d' lsn begin change tracking '%s(%u/%u)' ",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
"full resync buffer pool identifier '%s' num blocks '%d' blkno '%d' lsn begin change tracking '%s(%u/%u)' ",
relidstr,
numBlocks,
blkno,
XLogLocationToString(&entry->mirrorBufpoolResyncCkptLoc),
......@@ -291,10 +291,8 @@ FileRepPrimary_ResyncWrite(FileRepResyncHashEntry_s *entry)
FileRep_InsertConfigLogEntry(tmpBuf);
snprintf(tmpBuf, sizeof(tmpBuf),
"full resync buffer pool identifier '%u/%u/%u' lsn page '%s(%u/%u)' lsn end change tracking '%s(%u/%u)' ",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
"full resync buffer pool identifier '%s' lsn page '%s(%u/%u)' lsn end change tracking '%s(%u/%u)' ",
relidstr,
XLogLocationToString(&loc),
loc.xlogid,
loc.xrecoff,
......@@ -547,6 +545,7 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
Buffer buf;
BlockNumber numBlocks = 0;
SMgrRelation smgr_relation = NULL;
char relidstr[OIDCHARS + 1 + OIDCHARS + 1 + OIDCHARS + 1];
int ii;
XLogRecPtr loc;
XLogRecPtr loc1;
......@@ -575,6 +574,11 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
smgr_relation = smgropen(result->entries[ii].relFileNode);
snprintf(relidstr, sizeof(relidstr), "%u/%u/%u",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode);
numBlocks = smgrnblocks(smgr_relation);
if (Debug_filerep_print)
......@@ -596,12 +600,10 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
if (result->entries[ii].block_num >= numBlocks)
{
ereport(LOG,
(errmsg("could not resynchonize buffer pool relation '%u/%u/%u' block '%d' (maybe due to truncate), "
(errmsg("could not resynchonize buffer pool relation '%s' block '%d' (maybe due to truncate), "
"lsn change tracking '%s(%u/%u)' "
"number of blocks '%d' ",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
relidstr,
result->entries[ii].block_num,
XLogLocationToString(&loc1),
loc1.xlogid,
......@@ -615,7 +617,8 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
/* allow flushing buffers from buffer pool during scan */
FileRepResync_SetReadBufferRequest();
buf = ReadBuffer_Resync(smgr_relation,
result->entries[ii].block_num);
result->entries[ii].block_num,
relidstr);
FileRepResync_ResetReadBufferRequest();
Assert(result->entries[ii].block_num < numBlocks);
......@@ -628,11 +631,9 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
if(Debug_filerep_print)
{
elog(LOG,
"incremental resync buffer pool identifier '%u/%u/%u' num blocks '%d' blkno '%d' lsn page '%s(%u/%u)' "
"incremental resync buffer pool identifier '%s' num blocks '%d' blkno '%d' lsn page '%s(%u/%u)' "
"lsn end change tracking '%s(%u/%u)' ",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
relidstr,
numBlocks,
result->entries[ii].block_num,
XLogLocationToString(&loc),
......@@ -647,10 +648,8 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
char tmpBuf[FILEREP_MAX_LOG_DESCRIPTION_LEN];
snprintf(tmpBuf, sizeof(tmpBuf),
"incremental resync buffer pool identifier '%u/%u/%u' num blocks '%d' blkno '%d' lsn page '%s(%u/%u)' ",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
"incremental resync buffer pool identifier '%s' num blocks '%d' blkno '%d' lsn page '%s(%u/%u)' ",
relidstr,
numBlocks,
result->entries[ii].block_num,
XLogLocationToString(&loc),
......@@ -660,10 +659,8 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
FileRep_InsertConfigLogEntry(tmpBuf);
snprintf(tmpBuf, sizeof(tmpBuf),
"incremental resync buffer pool identifier '%u/%u/%u' lsn end change tracking '%s(%u/%u)' ",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
"incremental resync buffer pool identifier '%s' lsn end change tracking '%s(%u/%u)' ",
relidstr,
XLogLocationToString(&loc1),
result->entries[ii].lsn_end.xlogid,
result->entries[ii].lsn_end.xrecoff);
......@@ -677,12 +674,10 @@ FileRepPrimary_ResyncBufferPoolIncrementalWrite(ChangeTrackingRequest *request)
if (! XLByteEQ(PageGetLSN(page), result->entries[ii].lsn_end))
{
ereport(LOG,
(errmsg("Resynchonize buffer pool relation '%u/%u/%u' block '%d' has page lsn less than CT lsn, "
(errmsg("Resynchonize buffer pool relation '%s' block '%d' has page lsn less than CT lsn, "
"lsn end change tracking '%s(%u/%u)' lsn page '%s(%u/%u)' "
"number of blocks '%d'",
smgr_relation->smgr_rnode.spcNode,
smgr_relation->smgr_rnode.dbNode,
smgr_relation->smgr_rnode.relNode,
relidstr,
result->entries[ii].block_num,
XLogLocationToString(&loc),
loc.xlogid,
......
......@@ -88,10 +88,9 @@ static bool IsForInput;
/* local state for LockBufferForCleanup */
static volatile BufferDesc *PinCountWaitBuf = NULL;
static volatile BufferDesc *
BufferAlloc_Resync(SMgrRelation reln, //Relation reln,
BlockNumber blockNum,
bool *foundPtr);
static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf,
bool isTemp, BlockNumber blockNum,
const char *relErrMsgString, bool *pHit);
static bool PinBuffer(volatile BufferDesc *buf);
static void PinBuffer_Locked(volatile BufferDesc *buf);
......@@ -104,20 +103,13 @@ static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
int set_flag_bits);
static void buffer_write_error_callback(void *arg);
static volatile BufferDesc *BufferAlloc_SMgr(SMgrRelation smgr,
BlockNumber blockNum,
bool *foundPtr);
static volatile BufferDesc *BufferAlloc(SMgrRelation reln, BlockNumber blockNum,
bool *foundPtr);
static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
static void AtProcExit_Buffers(int code, Datum arg);
static Buffer ReadBuffer_Ex(Relation reln, BlockNumber blockNum, volatile BufferDesc* availBufHdr);
static Buffer ReadBuffer_Internal(SMgrRelation smgr,
BlockNumber blockNum,
bool isLocalBuf,
bool isTemp,
char * relErrMsgString,
bool *pHit);
#define ShouldMemoryProtect(buf) (ShouldMemoryProtectBufferPool() && ! BufferIsLocal(buf->buf_id+1) && ! BufferIsInvalid(buf->buf_id+1))
......@@ -193,6 +185,22 @@ ReadBuffer(Relation reln, BlockNumber blockNum)
return ReadBuffer_Ex(reln, blockNum, NULL);
}
/*
* Read Buffer for pages to be Resynced
*/
Buffer
ReadBuffer_Resync(SMgrRelation reln, BlockNumber blockNum, const char *relidstr)
{
bool isHit;
return ReadBuffer_common(reln,
false, /* isLocalBuf */
false, /* isTemp */
blockNum,
relidstr,
&isHit);
}
/*
* ReadBuffer_Ex -- returns a buffer containing the requested
* block of the requested relation. If the blknum
......@@ -205,11 +213,11 @@ ReadBuffer(Relation reln, BlockNumber blockNum)
* the block read. The returned buffer has been pinned.
* Does not return on error --- elog's instead.
*
* Assume when this function is called, that reln has been opened already.
*
* Assume when this function is called, that reln has been
* opened already.
*/
static Buffer
ReadBuffer_Ex(Relation reln, BlockNumber blockNum, volatile BufferDesc* availBufHdr)
ReadBuffer_Ex(Relation reln, BlockNumber blockNum, volatile BufferDesc *availBufHdr)
{
bool isHit;
Buffer returnBuffer;
......@@ -220,11 +228,12 @@ ReadBuffer_Ex(Relation reln, BlockNumber blockNum, volatile BufferDesc* availBuf
pgstat_count_buffer_read(reln);
returnBuffer =
ReadBuffer_Internal(reln->rd_smgr, blockNum,
reln->rd_isLocalBuf,
reln->rd_istemp,RelationGetRelationName(reln),
&isHit);
returnBuffer = ReadBuffer_common(reln->rd_smgr,
reln->rd_isLocalBuf,
reln->rd_istemp,
blockNum,
NULL,
&isHit);
if (isHit)
pgstat_count_buffer_hit(reln);
......@@ -233,7 +242,7 @@ ReadBuffer_Ex(Relation reln, BlockNumber blockNum, volatile BufferDesc* availBuf
}
/*
* ReadBuffer_Internal -- returns a buffer containing the requested
* ReadBuffer_common -- returns a buffer containing the requested
* block of the requested relation. If the blknum
* requested is P_NEW, extend the relation file and
* allocate a new block. (Caller is responsible for
......@@ -248,21 +257,23 @@ ReadBuffer_Ex(Relation reln, BlockNumber blockNum, volatile BufferDesc* availBuf
*
*/
static Buffer
ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
bool isLocalBuf,
bool isTemp, char * relErrMsgString,
bool *pHit)
ReadBuffer_common(SMgrRelation reln,
bool isLocalBuf,
bool isTemp,
BlockNumber blockNum,
const char *relErrMsgString,
bool *pHit)
{
//MIRROREDLOCK_BUFMGR_DECLARE;
volatile BufferDesc* bufHdr;
volatile BufferDesc *bufHdr;
Block bufBlock;
bool found;
bool isExtend;
bool found;
*pHit = false;
Assert(smgr != NULL);
Assert(reln != NULL);
MIRROREDLOCK_BUFMGR_MUST_ALREADY_BE_HELD;
/* Make sure we will have room to remember the buffer pin */
......@@ -270,12 +281,9 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
isExtend = (blockNum == P_NEW);
/* Open it at the smgr level if not already done */
// RelationOpenSmgrDirect(smgr);
/* Substitute proper block number if caller asked for P_NEW */
if (isExtend)
blockNum = smgrnblocks(smgr);
blockNum = smgrnblocks(reln);
// pgstat_count_buffer_read(reln);
......@@ -286,7 +294,7 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
if (isLocalBuf)
{
ReadLocalBufferCount++;
bufHdr = LocalBufferAlloc_SMgr(smgr, blockNum, &found);
bufHdr = LocalBufferAlloc(reln, blockNum, &found);
if (found)
LocalBufferHitCount++;
}
......@@ -298,7 +306,7 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
* lookup the buffer. IO_IN_PROGRESS is set if the requested block is
* not currently in memory.
*/
bufHdr = BufferAlloc_SMgr(smgr, blockNum, &found);
bufHdr = BufferAlloc(reln, blockNum, &found);
if (found)
BufferHitCount++;
}
......@@ -345,8 +353,8 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
if (isLocalBuf)
{
/* Only need to adjust flags */
Assert((bufHdr)->flags & BM_VALID);
(bufHdr)->flags &= ~BM_VALID;
Assert((bufHdr)->flags & BM_VALID);
bufHdr->flags &= ~BM_VALID;
}
else
{
......@@ -358,8 +366,8 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
do
{
LockBufHdr(bufHdr);
Assert((bufHdr)->flags & BM_VALID);
(bufHdr)->flags &= ~BM_VALID;
Assert(bufHdr->flags & BM_VALID);
bufHdr->flags &= ~BM_VALID;
UnlockBufHdr(bufHdr);
} while (!StartBufferIO(bufHdr, true));
}
......@@ -377,7 +385,7 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
* it's not been recycled) but come right back here to try smgrextend
* again.
*/
Assert(!((bufHdr)->flags & BM_VALID)); /* spinlock not needed */
Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
......@@ -387,12 +395,12 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
{
/* new buffers are zero-filled */
MemSet((char *) bufBlock, 0, BLCKSZ);
smgrextend(smgr, blockNum, (char *) bufBlock,
smgrextend(reln, blockNum, (char *) bufBlock,
isTemp);
}
else
{
smgrread(smgr, blockNum, (char *) bufBlock);
smgrread(reln, blockNum, (char *) bufBlock);
/* check for garbage data */
if (!PageHeaderIsValid((PageHeader) bufBlock))
{
......@@ -424,7 +432,7 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
if (isLocalBuf)
{
/* Only need to adjust flags */
(bufHdr)->flags |= BM_VALID;
bufHdr->flags |= BM_VALID;
}
else
{
......@@ -443,410 +451,7 @@ ReadBuffer_Internal(SMgrRelation smgr, BlockNumber blockNum,
}
/*
* Read Buffer for pages to be Resynced
*/
Buffer
ReadBuffer_Resync(SMgrRelation reln, BlockNumber blockNum)
{
volatile BufferDesc* bufHdr;
Block bufBlock;
bool found;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
ReadBufferCount++;
// ----------------------------------
// No MirroredLock acquistion needed here.
// ----------------------------------
/*
* lookup the buffer. IO_IN_PROGRESS is set if the requested block is
* not currently in memory.
*/
bufHdr = BufferAlloc_Resync(reln, blockNum, &found);
if (found)
BufferHitCount++;
/* At this point we do NOT hold any locks. */
/* if it was already in the buffer pool, we're done */
if (found)
{
goto done;
}
/*
* if we have gotten to this point, we have allocated a buffer for the
* page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
* if it's a shared buffer.
*
* Note: if smgrextend fails, we will end up with a buffer that is
* allocated but not marked BM_VALID. P_NEW will still select the same
* block number (because the relation didn't get any longer on disk) and
* so future attempts to extend the relation will find the same buffer (if
* it's not been recycled) but come right back here to try smgrextend
* again.
*/
Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */
bufBlock = BufHdrGetBlock(bufHdr);
BufferMProtect( bufHdr, PROT_WRITE | PROT_READ );
smgrread(reln,
blockNum, (char *) bufBlock);
/* check for garbage data */
if (!PageHeaderIsValid((PageHeader) bufBlock))
{
/*
* During WAL recovery, the first access to any data page should
* overwrite the whole page from the WAL; so a clobbered page
* header is not reason to fail. Hence, when InRecovery we may
* always act as though zero_damaged_pages is ON.
*/
if (zero_damaged_pages || InRecovery)
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page header in block %u of relation '%u/%u/%u'; zeroing out page",
blockNum,
reln->smgr_rnode.spcNode,
reln->smgr_rnode.dbNode,
reln->smgr_rnode.relNode)));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page header in block %u of relation '%u/%u/%u'",
blockNum,
reln->smgr_rnode.spcNode,
reln->smgr_rnode.dbNode,
reln->smgr_rnode.relNode),
errSendAlert(true)));
}
BufferMProtect( bufHdr, PROT_READ );
/* Set BM_VALID, terminate IO, and wake up any waiters */
TerminateBufferIO(bufHdr, false, BM_VALID);
done:
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageMiss;
return BufferDescriptorGetBuffer(bufHdr);
}
static volatile BufferDesc *
BufferAlloc_Resync(SMgrRelation reln,
BlockNumber blockNum,
bool *foundPtr)
{
BufferTag newTag; /* identity of requested block */
uint32 newHash; /* hash value for newTag */
LWLockId newPartitionLock; /* buffer partition lock for it */
BufferTag oldTag; /* previous identity of selected buffer */
uint32 oldHash; /* hash value for oldTag */
LWLockId oldPartitionLock; /* buffer partition lock for it */
BufFlags oldFlags;
int buf_id;
volatile BufferDesc *buf;
bool valid;
// ----------------------------------
// No MirroredLock acquistion needed here.
// ----------------------------------
/* create a tag so we can lookup the buffer */
newTag.rnode = reln->smgr_rnode;
newTag.blockNum = blockNum;
/* determine its hash code and partition lock ID */
newHash = BufTableHashCode(&newTag);
newPartitionLock = BufMappingPartitionLock(newHash);
/* see if the block is in the buffer pool already */
LWLockAcquire(newPartitionLock, LW_SHARED);
buf_id = BufTableLookup(&newTag, newHash);
if (buf_id >= 0)
{
/*
* Found it. Now, pin the buffer so no one can steal it from the
* buffer pool, and check to see if the correct data has been loaded
* into the buffer.
*/
buf = &BufferDescriptors[buf_id];
valid = PinBuffer(buf);
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(newPartitionLock);
*foundPtr = TRUE;
if (!valid)
{
/*
* We can only get here if (a) someone else is still reading in
* the page, or (b) a previous read attempt failed. We have to
* wait for any active read attempt to finish, and then set up our
* own read attempt if the page is still not BM_VALID.
* StartBufferIO does it all.
*/
if (StartBufferIO(buf, true))
{
/*
* If we get here, previous attempts to read the buffer must
* have failed ... but we shall bravely try again.
*/
*foundPtr = FALSE;
}
}
return buf;
}
/*
* Didn't find it in the buffer pool. We'll have to initialize a new
* buffer. Remember to unlock the mapping lock while doing the work.
*/
LWLockRelease(newPartitionLock);
/* Loop here in case we have to try another victim buffer */
for (;;)
{
/*
* Select a victim buffer. The buffer is returned with its header
* spinlock still held! Also (in most cases) the BufFreelistLock is
* still held, since it would be bad to hold the spinlock while
* possibly waking up other processes.
*/
buf = StrategyGetBuffer();
Assert(buf->refcount == 0);
/* Must copy buffer flags while we still hold the spinlock */
oldFlags = buf->flags;
/* Pin the buffer and then release the buffer spinlock */
PinBuffer_Locked(buf);
/* Now it's safe to release the freelist lock */
LWLockRelease(BufFreelistLock);
/*
* If the buffer was dirty, try to write it out. There is a race
* condition here, in that someone might dirty it after we released it
* above, or even while we are writing it out (since our share-lock
* won't prevent hint-bit updates). We will recheck the dirty bit
* after re-locking the buffer header.
*/
if (oldFlags & BM_DIRTY ||
(flush_buffer_pages_when_evicted && (oldFlags & BM_VALID)))
{
/*
* We need a share-lock on the buffer contents to write it out
* (else we might write invalid data, eg because someone else is
* compacting the page contents while we write). We must use a
* conditional lock acquisition here to avoid deadlock. Even
* though the buffer was not pinned (and therefore surely not
* locked) when StrategyGetBuffer returned it, someone else could
* have pinned and exclusive-locked it by the time we get here. If
* we try to get the lock unconditionally, we'd block waiting for
* them; if they later block waiting for us, deadlock ensues.
* (This has been observed to happen when two backends are both
* trying to split btree index pages, and the second one just
* happens to be trying to split the page the first one got from
* StrategyGetBuffer.)
*/
if ( ConditionalAcquireContentLock(buf, LW_SHARED))
{
FlushBuffer(buf, NULL);
ReleaseContentLock(buf);
}
else
{
/*
* Someone else has pinned the buffer, so give it up and loop
* back to get another one.
*/
UnpinBuffer(buf, true, false /* evidently recently used */ );
continue;
}
}
/*
* To change the association of a valid buffer, we'll need to have
* exclusive lock on both the old and new mapping partitions.
*/
if (oldFlags & BM_TAG_VALID)
{
/*
* Need to compute the old tag's hashcode and partition lock ID.
* XXX is it worth storing the hashcode in BufferDesc so we need
* not recompute it here? Probably not.
*/
oldTag = buf->tag;
oldHash = BufTableHashCode(&oldTag);
oldPartitionLock = BufMappingPartitionLock(oldHash);
/*
* Must lock the lower-numbered partition first to avoid
* deadlocks.
*/
if (oldPartitionLock < newPartitionLock)
{
LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
}
else if (oldPartitionLock > newPartitionLock)
{
LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
}
else
{
/* only one partition, only one lock */
LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
}
}
else
{
/* if it wasn't valid, we need only the new partition */
LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
/* these just keep the compiler quiet about uninit variables */
oldHash = 0;
oldPartitionLock = 0;
}
/*
* Try to make a hashtable entry for the buffer under its new tag.
* This could fail because while we were writing someone else
* allocated another buffer for the same block we want to read in.
* Note that we have not yet removed the hashtable entry for the old
* tag.
*/
buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
if (buf_id >= 0)
{
/*
* Got a collision. Someone has already done what we were about to
* do. We'll just handle this as if it were found in the buffer
* pool in the first place. First, give up the buffer we were
* planning to use. Don't allow it to be thrown in the free list
* (we don't want to hold freelist and mapping locks at once).
*/
UnpinBuffer(buf, true, false);
/* Can give up that buffer's mapping partition lock now */
if ((oldFlags & BM_TAG_VALID) &&
oldPartitionLock != newPartitionLock)
LWLockRelease(oldPartitionLock);
/* remaining code should match code at top of routine */
buf = &BufferDescriptors[buf_id];
valid = PinBuffer(buf);
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(newPartitionLock);
*foundPtr = TRUE;
if (!valid)
{
/*
* We can only get here if (a) someone else is still reading
* in the page, or (b) a previous read attempt failed. We
* have to wait for any active read attempt to finish, and
* then set up our own read attempt if the page is still not
* BM_VALID. StartBufferIO does it all.
*/
if (StartBufferIO(buf, true))
{
/*
* If we get here, previous attempts to read the buffer
* must have failed ... but we shall bravely try again.
*/
*foundPtr = FALSE;
}
}
return buf;
}
/*
* Need to lock the buffer header too in order to change its tag.
*/
LockBufHdr(buf);
/*
* Somebody could have pinned or re-dirtied the buffer while we were
* doing the I/O and making the new hashtable entry. If so, we can't
* recycle this buffer; we must undo everything we've done and start
* over with a new victim buffer.
*/
oldFlags = buf->flags;
if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
break;
UnlockBufHdr(buf);
BufTableDelete(&newTag, newHash);
if ((oldFlags & BM_TAG_VALID) &&
oldPartitionLock != newPartitionLock)
LWLockRelease(oldPartitionLock);
LWLockRelease(newPartitionLock);
UnpinBuffer(buf, true, false /* evidently recently used */ );
}
/*
* Okay, it's finally safe to rename the buffer.
*
* Clearing BM_VALID here is necessary, clearing the dirtybits is just
* paranoia. We also reset the usage_count since any recency of use of
* the old content is no longer relevant.
*/
buf->tag = newTag;
buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
buf->flags |= BM_TAG_VALID;
buf->usage_count = 0;
UnlockBufHdr(buf);
if (oldFlags & BM_TAG_VALID)
{
BufTableDelete(&oldTag, oldHash);
if (oldPartitionLock != newPartitionLock)
LWLockRelease(oldPartitionLock);
}
LWLockRelease(newPartitionLock);
/*
* Buffer contents are currently invalid. Try to get the io_in_progress
* lock. If StartBufferIO returns false, then someone else managed to
* read it before we did, so there's nothing left for BufferAlloc() to do.
*/
if (StartBufferIO(buf, true))
*foundPtr = FALSE;
else
*foundPtr = TRUE;
return buf;
}
/*
* BufferAlloc_SMgr -- subroutine for ReadBuffer. Handles lookup of a shared
* BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared
* buffer. If no buffer exists already, selects a replacement
* victim and evicts the old page, but does NOT read in new page.
*
......@@ -861,7 +466,9 @@ BufferAlloc_Resync(SMgrRelation reln,
* No locks are held either at entry or exit.
*/
static volatile BufferDesc *
BufferAlloc_SMgr(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
BufferAlloc(SMgrRelation smgr,
BlockNumber blockNum,
bool *foundPtr)
{
BufferTag newTag; /* identity of requested block */
uint32 newHash; /* hash value for newTag */
......@@ -874,6 +481,7 @@ BufferAlloc_SMgr(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
volatile BufferDesc *buf;
bool valid;
/* create a tag so we can lookup the buffer */
//INIT_BUFFERTAG(newTag, reln, blockNum);
newTag.rnode = smgr->smgr_rnode;
newTag.blockNum = blockNum;
......@@ -932,8 +540,6 @@ BufferAlloc_SMgr(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
/* Loop here in case we have to try another victim buffer */
for (;;)
{
bool unlockBufFreeList = false;
/*
* Select a victim buffer. The buffer is returned with its header
* spinlock still held! Also (in most cases) the BufFreelistLock is
......@@ -941,7 +547,6 @@ BufferAlloc_SMgr(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
* possibly waking up other processes.
*/
buf = StrategyGetBuffer();
unlockBufFreeList = true;
Assert(buf->refcount == 0);
......@@ -952,8 +557,7 @@ BufferAlloc_SMgr(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
PinBuffer_Locked(buf);
/* Now it's safe to release the freelist lock */
if (unlockBufFreeList)
LWLockRelease(BufFreelistLock);
LWLockRelease(BufFreelistLock);
/*
* If the buffer was dirty, try to write it out. There is a race
......
......@@ -60,7 +60,7 @@ static Block GetLocalBufferStorage(void);
* does not get set.
*/
BufferDesc *
LocalBufferAlloc_SMgr(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
{
BufferTag newTag; /* identity of requested block */
LocalBufferLookupEnt *hresult;
......
......@@ -204,7 +204,7 @@ extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);
/* localbuf.c */
/*extern BufferDesc *LocalBufferAlloc(Relation reln, BlockNumber blockNum,
bool *foundPtr);*/
extern BufferDesc *LocalBufferAlloc_SMgr(SMgrRelation reln, BlockNumber blockNum,
extern BufferDesc *LocalBufferAlloc(SMgrRelation reln, BlockNumber blockNum,
bool *foundPtr);
extern void MarkLocalBufferDirty(Buffer buffer);
extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
......
......@@ -284,7 +284,8 @@ typedef struct MirroredLockBufMgrLocalVars
* prototypes for functions in bufmgr.c
*/
extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
extern Buffer ReadBuffer_Resync(SMgrRelation reln, BlockNumber blockNum);
extern Buffer ReadBuffer_Resync(SMgrRelation reln, BlockNumber blockNum,
const char *relidstr);
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册