提交 d65522ae 编写于 作者: T Tom Lane

Upgrade localbuf.c to use a hash table instead of linear search to

find already-allocated local buffers.  This is the last obstacle
in the way of setting NLocBuffer to something reasonably large.
上级 2e629080
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.64 2005/03/18 16:16:09 tgl Exp $ * $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.65 2005/03/19 17:39:43 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -18,18 +18,24 @@ ...@@ -18,18 +18,24 @@
#include "storage/buf_internals.h" #include "storage/buf_internals.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/relcache.h" #include "utils/memutils.h"
#include "utils/resowner.h" #include "utils/resowner.h"
/*#define LBDEBUG*/ /*#define LBDEBUG*/
/* entry for buffer lookup hashtable */
typedef struct
{
BufferTag key; /* Tag of a disk page */
int id; /* Associated local buffer's index */
} LocalBufferLookupEnt;
/* Note: this macro only works on local buffers, not shared ones! */ /* Note: this macro only works on local buffers, not shared ones! */
#define LocalBufHdrGetBlock(bufHdr) \ #define LocalBufHdrGetBlock(bufHdr) \
LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
/* should be a GUC parameter some day */ int NLocBuffer = 0; /* until buffers are initialized */
int NLocBuffer = 64;
BufferDesc *LocalBufferDescriptors = NULL; BufferDesc *LocalBufferDescriptors = NULL;
Block *LocalBufferBlockPointers = NULL; Block *LocalBufferBlockPointers = NULL;
...@@ -37,10 +43,12 @@ int32 *LocalRefCount = NULL; ...@@ -37,10 +43,12 @@ int32 *LocalRefCount = NULL;
static int nextFreeLocalBuf = 0; static int nextFreeLocalBuf = 0;
static HTAB *LocalBufHash = NULL;
/* /*
* LocalBufferAlloc - * LocalBufferAlloc -
* allocate a local buffer. We do round robin allocation for now. * Find or create a local buffer for the given page of the given relation.
* *
* API is similar to bufmgr.c's BufferAlloc, except that we do not need * API is similar to bufmgr.c's BufferAlloc, except that we do not need
* to do any locking since this is all local. Also, IO_IN_PROGRESS * to do any locking since this is all local. Also, IO_IN_PROGRESS
...@@ -50,35 +58,39 @@ BufferDesc * ...@@ -50,35 +58,39 @@ BufferDesc *
LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
{ {
BufferTag newTag; /* identity of requested block */ BufferTag newTag; /* identity of requested block */
int i; LocalBufferLookupEnt *hresult;
int trycounter;
BufferDesc *bufHdr; BufferDesc *bufHdr;
int b;
int trycounter;
bool found;
INIT_BUFFERTAG(newTag, reln, blockNum); INIT_BUFFERTAG(newTag, reln, blockNum);
/* a low tech search for now -- should use a hashtable */ /* See if the desired buffer already exists */
for (i = 0; i < NLocBuffer; i++) hresult = (LocalBufferLookupEnt *)
hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
if (hresult)
{ {
bufHdr = &LocalBufferDescriptors[i]; b = hresult->id;
if (BUFFERTAGS_EQUAL(bufHdr->tag, newTag)) bufHdr = &LocalBufferDescriptors[b];
{ Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
#ifdef LBDEBUG #ifdef LBDEBUG
fprintf(stderr, "LB ALLOC (%u,%d) %d\n", fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
RelationGetRelid(reln), blockNum, -i - 1); RelationGetRelid(reln), blockNum, -b - 1);
#endif #endif
LocalRefCount[i]++; LocalRefCount[b]++;
ResourceOwnerRememberBuffer(CurrentResourceOwner, ResourceOwnerRememberBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(bufHdr)); BufferDescriptorGetBuffer(bufHdr));
if (bufHdr->flags & BM_VALID) if (bufHdr->flags & BM_VALID)
*foundPtr = TRUE; *foundPtr = TRUE;
else else
{ {
/* Previous read attempt must have failed; try again */ /* Previous read attempt must have failed; try again */
*foundPtr = FALSE; *foundPtr = FALSE;
}
return bufHdr;
} }
return bufHdr;
} }
#ifdef LBDEBUG #ifdef LBDEBUG
...@@ -93,7 +105,7 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) ...@@ -93,7 +105,7 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
trycounter = NLocBuffer; trycounter = NLocBuffer;
for (;;) for (;;)
{ {
int b = nextFreeLocalBuf; b = nextFreeLocalBuf;
if (++nextFreeLocalBuf >= NLocBuffer) if (++nextFreeLocalBuf >= NLocBuffer)
nextFreeLocalBuf = 0; nextFreeLocalBuf = 0;
...@@ -136,31 +148,50 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) ...@@ -136,31 +148,50 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
(char *) LocalBufHdrGetBlock(bufHdr), (char *) LocalBufHdrGetBlock(bufHdr),
true); true);
/* Mark not-dirty now in case we error out below */
bufHdr->flags &= ~BM_DIRTY;
LocalBufferFlushCount++; LocalBufferFlushCount++;
} }
/* /*
* lazy memory allocation: allocate space on first use of a buffer. * lazy memory allocation: allocate space on first use of a buffer.
*
* Note this path cannot be taken for a buffer that was previously in
* use, so it's okay to do it (and possibly error out) before marking
* the buffer as not dirty.
*/ */
if (LocalBufHdrGetBlock(bufHdr) == NULL) if (LocalBufHdrGetBlock(bufHdr) == NULL)
{ {
char *data = (char *) malloc(BLCKSZ); char *data;
if (data == NULL) data = (char *) MemoryContextAlloc(TopMemoryContext, BLCKSZ);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
/* /* Set pointer for use by BufferGetBlock() macro */
* Set pointer for use by BufferGetBlock() macro.
*/
LocalBufHdrGetBlock(bufHdr) = (Block) data; LocalBufHdrGetBlock(bufHdr) = (Block) data;
} }
/*
* Update the hash table: remove old entry, if any, and make new one.
*/
if (bufHdr->flags & BM_TAG_VALID)
{
hresult = (LocalBufferLookupEnt *)
hash_search(LocalBufHash, (void *) &bufHdr->tag,
HASH_REMOVE, NULL);
if (!hresult) /* shouldn't happen */
elog(ERROR, "local buffer hash table corrupted");
/* mark buffer invalid just in case hash insert fails */
CLEAR_BUFFERTAG(bufHdr->tag);
bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID);
}
hresult = (LocalBufferLookupEnt *)
hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found);
if (!hresult)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
if (found) /* shouldn't happen */
elog(ERROR, "local buffer hash table corrupted");
hresult->id = b;
/* /*
* it's all ours now. * it's all ours now.
*/ */
...@@ -215,20 +246,39 @@ WriteLocalBuffer(Buffer buffer, bool release) ...@@ -215,20 +246,39 @@ WriteLocalBuffer(Buffer buffer, bool release)
void void
InitLocalBuffer(void) InitLocalBuffer(void)
{ {
int nbufs = 64; /* should be from a GUC var */
HASHCTL info;
int i; int i;
/* /* Create the lookup hash table */
* these aren't going away. I'm not gonna use palloc. MemSet(&info, 0, sizeof(info));
*/ info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(LocalBufferLookupEnt);
info.hash = tag_hash;
LocalBufHash = hash_create("Local Buffer Lookup Table",
nbufs,
&info,
HASH_ELEM | HASH_FUNCTION);
if (!LocalBufHash)
elog(ERROR, "could not initialize local buffer hash table");
/* Allocate and zero buffer headers and auxiliary arrays */
LocalBufferDescriptors = (BufferDesc *) LocalBufferDescriptors = (BufferDesc *)
calloc(NLocBuffer, sizeof(*LocalBufferDescriptors)); MemoryContextAllocZero(TopMemoryContext,
nbufs * sizeof(BufferDesc));
LocalBufferBlockPointers = (Block *) LocalBufferBlockPointers = (Block *)
calloc(NLocBuffer, sizeof(*LocalBufferBlockPointers)); MemoryContextAllocZero(TopMemoryContext,
nbufs * sizeof(Block));
LocalRefCount = (int32 *) LocalRefCount = (int32 *)
calloc(NLocBuffer, sizeof(*LocalRefCount)); MemoryContextAllocZero(TopMemoryContext,
nbufs * sizeof(int32));
nextFreeLocalBuf = 0; nextFreeLocalBuf = 0;
for (i = 0; i < NLocBuffer; i++) /* initialize fields that need to start off nonzero */
for (i = 0; i < nbufs; i++)
{ {
BufferDesc *buf = &LocalBufferDescriptors[i]; BufferDesc *buf = &LocalBufferDescriptors[i];
...@@ -240,6 +290,9 @@ InitLocalBuffer(void) ...@@ -240,6 +290,9 @@ InitLocalBuffer(void)
*/ */
buf->buf_id = -i - 2; buf->buf_id = -i - 2;
} }
/* Initialization done, mark buffers allocated */
NLocBuffer = nbufs;
} }
/* /*
...@@ -271,5 +324,6 @@ void ...@@ -271,5 +324,6 @@ void
AtProcExit_LocalBuffers(void) AtProcExit_LocalBuffers(void)
{ {
/* just zero the refcounts ... */ /* just zero the refcounts ... */
MemSet(LocalRefCount, 0, NLocBuffer * sizeof(*LocalRefCount)); if (LocalRefCount)
MemSet(LocalRefCount, 0, NLocBuffer * sizeof(*LocalRefCount));
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册