提交 8876e37d 编写于 作者: T Teodor Sigaev

Reduce size of critial section during vacuum full, critical

sections now isn't nested. All user-defined functions now is
called outside critsections. Small improvements in WAL
protocol.

TODO: improve XLOG replay
上级 815f5840
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.134 2006/05/10 23:18:38 tgl Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.135 2006/05/17 16:34:59 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -347,7 +347,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) ...@@ -347,7 +347,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
* Form index tuples vector to split: * Form index tuples vector to split:
* remove old tuple if t's needed and add new tuples to vector * remove old tuple if t's needed and add new tuples to vector
*/ */
itvec = gistextractbuffer(state->stack->buffer, &tlen); itvec = gistextractpage(state->stack->page, &tlen);
if ( !is_leaf ) { if ( !is_leaf ) {
/* on inner page we should remove old tuple */ /* on inner page we should remove old tuple */
int pos = state->stack->childoffnum - FirstOffsetNumber; int pos = state->stack->childoffnum - FirstOffsetNumber;
...@@ -501,7 +501,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) ...@@ -501,7 +501,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
} }
rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
offs, noffs, false, offs, noffs,
state->itup, state->ituplen, state->itup, state->ituplen,
&(state->key)); &(state->key));
...@@ -1157,7 +1157,7 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke ...@@ -1157,7 +1157,7 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
XLogRecData *rdata; XLogRecData *rdata;
rdata = formUpdateRdata(r->rd_node, buffer, rdata = formUpdateRdata(r->rd_node, buffer,
NULL, 0, false, NULL, 0,
itup, len, key); itup, len, key);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.12 2006/05/17 16:34:59 teodor Exp $
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
...@@ -112,18 +112,17 @@ gistfitpage(IndexTuple *itvec, int len) { ...@@ -112,18 +112,17 @@ gistfitpage(IndexTuple *itvec, int len) {
* Read buffer into itup vector * Read buffer into itup vector
*/ */
IndexTuple * IndexTuple *
gistextractbuffer(Buffer buffer, int *len /* out */ ) gistextractpage(Page page, int *len /* out */ )
{ {
OffsetNumber i, OffsetNumber i,
maxoff; maxoff;
IndexTuple *itvec; IndexTuple *itvec;
Page p = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(p); maxoff = PageGetMaxOffsetNumber(page);
*len = maxoff; *len = maxoff;
itvec = palloc(sizeof(IndexTuple) * maxoff); itvec = palloc(sizeof(IndexTuple) * maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
return itvec; return itvec;
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -47,23 +47,235 @@ typedef struct ...@@ -47,23 +47,235 @@ typedef struct
bool emptypage; bool emptypage;
} ArrayTuple; } ArrayTuple;
/*
* Make union of keys on page
*/
static IndexTuple
PageMakeUnionKey(GistVacuum *gv, Buffer buffer) {
Page page = BufferGetPage( buffer );
IndexTuple *vec,
tmp, res;
int veclen = 0;
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
vec = gistextractpage(page, &veclen);
/* we call gistunion() in temprorary context because user-defined functions called in gistunion()
may do not free all memory */
tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
MemoryContextSwitchTo(oldCtx);
res = (IndexTuple) palloc(IndexTupleSize(tmp));
memcpy(res, tmp, IndexTupleSize(tmp));
ItemPointerSetBlockNumber(&(res->t_tid), BufferGetBlockNumber(buffer));
GistTupleSetValid(res);
MemoryContextReset(gv->opCtx);
return res;
}
static void
gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
Buffer buffer;
Page page;
buffer = ReadBuffer(gv->index, blkno);
LockBuffer(buffer, GIST_EXCLUSIVE);
page = (Page) BufferGetPage(buffer);
if ( !GistPageIsLeaf(page) ) {
int i;
for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i = OffsetNumberNext(i)) {
ItemId iid = PageGetItemId(page, i);
IndexTuple idxtuple = (IndexTuple) PageGetItem(page, iid);
gistDeleteSubtree(gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)));
}
}
START_CRIT_SECTION();
MarkBufferDirty(buffer);
page = (Page) BufferGetPage(buffer);
GistPageSetDeleted(page);
gv->result->std.pages_deleted++;
if (!gv->index->rd_istemp)
{
XLogRecData rdata;
XLogRecPtr recptr;
gistxlogPageDelete xlrec;
xlrec.node = gv->index->rd_node;
xlrec.blkno = blkno;
rdata.buffer = InvalidBuffer;
rdata.data = (char *) &xlrec;
rdata.len = sizeof(gistxlogPageDelete);
rdata.next = NULL;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
else
PageSetLSN(page, XLogRecPtrForTemp);
END_CRIT_SECTION();
UnlockReleaseBuffer(buffer);
}
static Page
GistPageGetCopyPage( Page page ) {
Size pageSize = PageGetPageSize( page );
Page tmppage;
tmppage=(Page)palloc( pageSize );
memcpy( tmppage, page, pageSize );
return tmppage;
}
static ArrayTuple
vacuumSplitPage(GistVacuum *gv, Page tempPage, Buffer buffer, IndexTuple *addon, int curlenaddon) {
ArrayTuple res = {NULL, 0, false};
IndexTuple *vec;
SplitedPageLayout *dist = NULL,
*ptr;
int i, veclen=0;
BlockNumber blkno = BufferGetBlockNumber(buffer);
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
vec = gistextractpage(tempPage, &veclen);
vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
dist = gistSplit(gv->index, tempPage, vec, veclen, &(gv->giststate));
MemoryContextSwitchTo(oldCtx);
if (blkno != GIST_ROOT_BLKNO) {
/* if non-root split then we should not allocate new buffer */
dist->buffer = buffer;
dist->page = tempPage;
/* during vacuum we never split leaf page */
GistPageGetOpaque(dist->page)->flags = 0;
} else
pfree(tempPage);
res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
res.ituplen = 0;
/* make new pages and fills them */
for (ptr = dist; ptr; ptr = ptr->next) {
char *data;
if ( ptr->buffer == InvalidBuffer ) {
ptr->buffer = gistNewBuffer( gv->index );
GISTInitBuffer( ptr->buffer, 0 );
ptr->page = BufferGetPage(ptr->buffer);
}
ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
data = (char*)(ptr->list);
for(i=0;i<ptr->block.num;i++) {
if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
data += IndexTupleSize((IndexTuple)data);
}
ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
res.ituplen++;
}
START_CRIT_SECTION();
for (ptr = dist; ptr; ptr = ptr->next) {
MarkBufferDirty(ptr->buffer);
GistPageGetOpaque(ptr->page)->rightlink = InvalidBlockNumber;
}
/* restore splitted non-root page */
if (blkno != GIST_ROOT_BLKNO) {
PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
dist->page = BufferGetPage( dist->buffer );
}
if (!gv->index->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData *rdata;
ItemPointerData key; /* set key for incomplete
* insert */
char *xlinfo;
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
rdata = formSplitRdata(gv->index->rd_node, blkno,
false, &key, dist);
xlinfo = rdata->data;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
for (ptr = dist; ptr; ptr = ptr->next)
{
PageSetLSN(BufferGetPage(ptr->buffer), recptr);
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
}
pfree(xlinfo);
pfree(rdata);
}
else
{
for (ptr = dist; ptr; ptr = ptr->next)
PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
}
for (ptr = dist; ptr; ptr = ptr->next)
{
/* we must keep the buffer pin on the head page */
if (BufferGetBlockNumber(ptr->buffer) != blkno)
UnlockReleaseBuffer( ptr->buffer );
}
if (blkno == GIST_ROOT_BLKNO)
{
ItemPointerData key; /* set key for incomplete
* insert */
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
}
END_CRIT_SECTION();
MemoryContextReset(gv->opCtx);
return res;
}
static ArrayTuple static ArrayTuple
gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
{ {
ArrayTuple res = {NULL, 0, false}; ArrayTuple res = {NULL, 0, false};
Buffer buffer; Buffer buffer;
Page page; Page page, tempPage = NULL;
OffsetNumber i, OffsetNumber i,
maxoff; maxoff;
ItemId iid; ItemId iid;
int lenaddon = 4, int lenaddon = 4,
curlenaddon = 0, curlenaddon = 0,
ntodelete = 0; nOffToDelete = 0,
nBlkToDelete = 0;
IndexTuple idxtuple, IndexTuple idxtuple,
*addon = NULL; *addon = NULL;
bool needwrite = false; bool needwrite = false;
OffsetNumber todelete[MaxOffsetNumber]; OffsetNumber offToDelete[MaxOffsetNumber];
BlockNumber blkToDelete[MaxOffsetNumber];
ItemPointerData *completed = NULL; ItemPointerData *completed = NULL;
int ncompleted = 0, int ncompleted = 0,
lencompleted = 16; lencompleted = 16;
...@@ -76,12 +288,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) ...@@ -76,12 +288,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(page); maxoff = PageGetMaxOffsetNumber(page);
/*
* XXX need to reduce scope of changes to page so we can make this
* critical section less extensive
*/
START_CRIT_SECTION();
if (GistPageIsLeaf(page)) if (GistPageIsLeaf(page))
{ {
if (GistTuplesDeleted(page)) if (GistTuplesDeleted(page))
...@@ -92,13 +298,16 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) ...@@ -92,13 +298,16 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted); completed = (ItemPointerData *) palloc(sizeof(ItemPointerData) * lencompleted);
addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon); addon = (IndexTuple *) palloc(sizeof(IndexTuple) * lenaddon);
/* get copy of page to work */
tempPage = GistPageGetCopyPage(page);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{ {
ArrayTuple chldtuple; ArrayTuple chldtuple;
bool needchildunion; bool needchildunion;
iid = PageGetItemId(page, i); iid = PageGetItemId(tempPage, i);
idxtuple = (IndexTuple) PageGetItem(page, iid); idxtuple = (IndexTuple) PageGetItem(tempPage, iid);
needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false; needchildunion = (GistTupleIsInvalid(idxtuple)) ? true : false;
if (needchildunion) if (needchildunion)
...@@ -109,14 +318,19 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) ...@@ -109,14 +318,19 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
needchildunion); needchildunion);
if (chldtuple.ituplen || chldtuple.emptypage) if (chldtuple.ituplen || chldtuple.emptypage)
{ {
PageIndexTupleDelete(page, i); /* update tuple or/and inserts new */
todelete[ntodelete++] = i; if ( chldtuple.emptypage )
blkToDelete[nBlkToDelete++] = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
offToDelete[nOffToDelete++] = i;
PageIndexTupleDelete(tempPage, i);
i--; i--;
maxoff--; maxoff--;
needwrite = needunion = true; needwrite = needunion = true;
if (chldtuple.ituplen) if (chldtuple.ituplen)
{ {
Assert( chldtuple.emptypage == false );
while (curlenaddon + chldtuple.ituplen >= lenaddon) while (curlenaddon + chldtuple.ituplen >= lenaddon)
{ {
lenaddon *= 2; lenaddon *= 2;
...@@ -150,200 +364,102 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) ...@@ -150,200 +364,102 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
} }
} }
} }
Assert( maxoff == PageGetMaxOffsetNumber(tempPage) );
if (curlenaddon) if (curlenaddon)
{ {
/* insert updated tuples */ /* insert updated tuples */
if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber)) if (gistnospace(tempPage, addon, curlenaddon, InvalidOffsetNumber)) {
{
/* there is no space on page to insert tuples */ /* there is no space on page to insert tuples */
IndexTuple *vec; res = vacuumSplitPage(gv, tempPage, buffer, addon, curlenaddon);
SplitedPageLayout *dist = NULL, tempPage=NULL; /* vacuumSplitPage() free tempPage */
*ptr; needwrite = needunion = false; /* gistSplit already forms unions and writes pages */
int i, veclen=0; } else
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); /* enough free space */
gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber);
vec = gistextractbuffer(buffer, &veclen); }
vec = gistjoinvector(vec, &veclen, addon, curlenaddon); }
dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
MemoryContextSwitchTo(oldCtx);
if (blkno != GIST_ROOT_BLKNO) {
/* if non-root split then we should not allocate new buffer */
dist->buffer = buffer;
dist->page = BufferGetPage(dist->buffer);
GistPageGetOpaque(dist->page)->flags = 0;
}
res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
res.ituplen = 0;
/* make new pages and fills them */
for (ptr = dist; ptr; ptr = ptr->next) {
char *data;
if ( ptr->buffer == InvalidBuffer ) {
ptr->buffer = gistNewBuffer( gv->index );
GISTInitBuffer( ptr->buffer, 0 );
ptr->page = BufferGetPage(ptr->buffer);
}
ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
data = (char*)(ptr->list);
for(i=0;i<ptr->block.num;i++) {
if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
data += IndexTupleSize((IndexTuple)data);
}
ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
res.ituplen++;
MarkBufferDirty(ptr->buffer);
}
if (!gv->index->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData *rdata;
ItemPointerData key; /* set key for incomplete
* insert */
char *xlinfo;
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
rdata = formSplitRdata(gv->index->rd_node, blkno, /*
false, &key, dist); * If page is empty, we should remove pointer to it before
xlinfo = rdata->data; * deleting page (except root)
*/
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); if ( blkno != GIST_ROOT_BLKNO && ( PageIsEmpty(page) || (tempPage && PageIsEmpty(tempPage)) ) ) {
for (ptr = dist; ptr; ptr = ptr->next) /*
{ * New version of page is empty, so leave it unchanged,
PageSetLSN(BufferGetPage(ptr->buffer), recptr); * upper call will mark our page as deleted.
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); * In case of page split we never will be here...
} *
* If page was empty it can't become non-empty during processing
*/
res.emptypage = true;
UnlockReleaseBuffer(buffer);
} else {
/* write page and remove its childs if it need */
pfree(xlinfo); START_CRIT_SECTION();
pfree(rdata);
}
else
{
for (ptr = dist; ptr; ptr = ptr->next)
{
PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
}
}
for (ptr = dist; ptr; ptr = ptr->next) if ( tempPage && needwrite ) {
{ PageRestoreTempPage(tempPage, page);
/* we must keep the buffer pin on the head page */ tempPage = NULL;
if (BufferGetBlockNumber(ptr->buffer) != blkno) }
UnlockReleaseBuffer( ptr->buffer );
}
if (blkno == GIST_ROOT_BLKNO) /* Empty index */
{ if (PageIsEmpty(page) && blkno == GIST_ROOT_BLKNO )
ItemPointerData key; /* set key for incomplete {
* insert */ needwrite = true;
GistPageSetLeaf(page);
}
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
if (needwrite)
{
MarkBufferDirty(buffer);
GistClearTuplesDeleted(page);
oldCtx = MemoryContextSwitchTo(gv->opCtx); if (!gv->index->rd_istemp)
gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key); {
MemoryContextSwitchTo(oldCtx); XLogRecData *rdata;
} XLogRecPtr recptr;
char *xlinfo;
needwrite = false; rdata = formUpdateRdata(gv->index->rd_node, buffer,
offToDelete, nOffToDelete,
addon, curlenaddon, NULL);
xlinfo = rdata->next->data;
MemoryContextReset(gv->opCtx); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
needunion = false; /* gistSplit already forms unions */ pfree(xlinfo);
pfree(rdata);
} }
else else
{ PageSetLSN(page, XLogRecPtrForTemp);
/* enough free space */
gistfillbuffer(gv->index, page, addon, curlenaddon, InvalidOffsetNumber);
}
} }
}
if (needunion) END_CRIT_SECTION();
{
/* forms union for page or check empty */
if (PageIsEmpty(page))
{
if (blkno == GIST_ROOT_BLKNO)
{
needwrite = true;
GistPageSetLeaf(page);
}
else
{
needwrite = true;
res.emptypage = true;
GistPageSetDeleted(page);
gv->result->std.pages_deleted++;
}
}
else
{
IndexTuple *vec,
tmp;
int veclen = 0;
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
vec = gistextractbuffer(buffer, &veclen);
tmp = gistunion(gv->index, vec, veclen, &(gv->giststate));
MemoryContextSwitchTo(oldCtx);
if ( needunion && !PageIsEmpty(page) )
{
res.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); res.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
res.ituplen = 1; res.ituplen = 1;
res.itup[0] = (IndexTuple) palloc(IndexTupleSize(tmp)); res.itup[0] = PageMakeUnionKey(gv, buffer);
memcpy(res.itup[0], tmp, IndexTupleSize(tmp));
ItemPointerSetBlockNumber(&(res.itup[0]->t_tid), blkno);
GistTupleSetValid(res.itup[0]);
MemoryContextReset(gv->opCtx);
} }
}
if (needwrite) UnlockReleaseBuffer(buffer);
{
MarkBufferDirty(buffer);
GistClearTuplesDeleted(page);
if (!gv->index->rd_istemp)
{
XLogRecData *rdata;
XLogRecPtr recptr;
char *xlinfo;
rdata = formUpdateRdata(gv->index->rd_node, buffer,
todelete, ntodelete, res.emptypage,
addon, curlenaddon, NULL);
xlinfo = rdata->data;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); /* delete empty children, now we havn't any links to pointed subtrees */
PageSetLSN(page, recptr); for(i=0;i<nBlkToDelete;i++)
PageSetTLI(page, ThisTimeLineID); gistDeleteSubtree(gv, blkToDelete[i]);
pfree(xlinfo); if (ncompleted && !gv->index->rd_istemp)
pfree(rdata); gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
}
else
PageSetLSN(page, XLogRecPtrForTemp);
} }
END_CRIT_SECTION();
UnlockReleaseBuffer(buffer);
if (ncompleted && !gv->index->rd_istemp)
gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
for (i = 0; i < curlenaddon; i++) for (i = 0; i < curlenaddon; i++)
pfree(addon[i]); pfree(addon[i]);
...@@ -351,6 +467,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) ...@@ -351,6 +467,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
pfree(addon); pfree(addon);
if (completed) if (completed)
pfree(completed); pfree(completed);
if (tempPage)
pfree(tempPage);
return res; return res;
} }
...@@ -627,10 +746,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) ...@@ -627,10 +746,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
gistxlogPageUpdate *xlinfo; gistxlogPageUpdate *xlinfo;
rdata = formUpdateRdata(rel->rd_node, buffer, rdata = formUpdateRdata(rel->rd_node, buffer,
todelete, ntodelete, false, todelete, ntodelete,
NULL, 0, NULL, 0,
NULL); NULL);
xlinfo = (gistxlogPageUpdate *) rdata->data; xlinfo = (gistxlogPageUpdate *) rdata->next->data;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
...@@ -209,41 +209,33 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) ...@@ -209,41 +209,33 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
return; return;
} }
if (xlrec.data->isemptypage) if (isnewroot)
GISTInitBuffer(buffer, 0);
else if (xlrec.data->ntodelete)
{ {
while (!PageIsEmpty(page)) int i;
PageIndexTupleDelete(page, FirstOffsetNumber);
if (xlrec.data->blkno == GIST_ROOT_BLKNO) for (i = 0; i < xlrec.data->ntodelete; i++)
GistPageSetLeaf(page); PageIndexTupleDelete(page, xlrec.todelete[i]);
else if (GistPageIsLeaf(page))
GistPageSetDeleted(page); GistMarkTuplesDeleted(page);
} }
else
{
if (isnewroot)
GISTInitBuffer(buffer, 0);
else if (xlrec.data->ntodelete)
{
int i;
for (i = 0; i < xlrec.data->ntodelete; i++) /* add tuples */
PageIndexTupleDelete(page, xlrec.todelete[i]); if (xlrec.len > 0)
if (GistPageIsLeaf(page)) gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
GistMarkTuplesDeleted(page);
}
/* add tuples */ /*
if (xlrec.len > 0) * special case: leafpage, nothing to insert, nothing to delete, then
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber); * vacuum marks page
*/
if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
GistClearTuplesDeleted(page);
/* if ( !GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO )
* special case: leafpage, nothing to insert, nothing to delete, then /* all links on non-leaf root page was deleted by vacuum full,
* vacuum marks page so root page becomes a leaf */
*/ GistPageSetLeaf(page);
if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
GistClearTuplesDeleted(page);
}
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
...@@ -252,6 +244,29 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) ...@@ -252,6 +244,29 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
static void
gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
{
gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
Relation reln;
Buffer buffer;
Page page;
reln = XLogOpenRelation(xldata->node);
buffer = XLogReadBuffer(reln, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
GISTInitBuffer( buffer, 0 );
page = (Page) BufferGetPage(buffer);
GistPageSetDeleted(page);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{ {
...@@ -382,6 +397,9 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -382,6 +397,9 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_GIST_PAGE_UPDATE: case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(lsn, record, false); gistRedoPageUpdateRecord(lsn, record, false);
break; break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record);
break;
case XLOG_GIST_NEW_ROOT: case XLOG_GIST_NEW_ROOT:
gistRedoPageUpdateRecord(lsn, record, true); gistRedoPageUpdateRecord(lsn, record, true);
break; break;
...@@ -405,8 +423,10 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -405,8 +423,10 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
static void static void
out_target(StringInfo buf, RelFileNode node, ItemPointerData key) out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
{ {
appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u", appendStringInfo(buf, "rel %u/%u/%u",
node.spcNode, node.dbNode, node.relNode, node.spcNode, node.dbNode, node.relNode);
if ( ItemPointerIsValid( &key ) )
appendStringInfo(buf, "; tid %u/%u",
ItemPointerGetBlockNumber(&key), ItemPointerGetBlockNumber(&key),
ItemPointerGetOffsetNumber(&key)); ItemPointerGetOffsetNumber(&key));
} }
...@@ -418,6 +438,14 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) ...@@ -418,6 +438,14 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
appendStringInfo(buf, "; block number %u", xlrec->blkno); appendStringInfo(buf, "; block number %u", xlrec->blkno);
} }
static void
out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
{
appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
xlrec->blkno);
}
static void static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{ {
...@@ -438,6 +466,9 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -438,6 +466,9 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "page_update: "); appendStringInfo(buf, "page_update: ");
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec); out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
break; break;
case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break;
case XLOG_GIST_NEW_ROOT: case XLOG_GIST_NEW_ROOT:
appendStringInfo(buf, "new_root: "); appendStringInfo(buf, "new_root: ");
out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
...@@ -643,7 +674,7 @@ gistContinueInsert(gistIncompleteInsert *insert) ...@@ -643,7 +674,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
* we split root, just copy tuples from old root to new * we split root, just copy tuples from old root to new
* page * page
*/ */
parentitup = gistextractbuffer(buffers[numbuffer - 1], parentitup = gistextractpage(pages[numbuffer - 1],
&pituplen); &pituplen);
/* sanity check */ /* sanity check */
...@@ -796,7 +827,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ...@@ -796,7 +827,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
*/ */
XLogRecData * XLogRecData *
formUpdateRdata(RelFileNode node, Buffer buffer, formUpdateRdata(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete, bool emptypage, OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen, ItemPointer key) IndexTuple *itup, int ituplen, ItemPointer key)
{ {
XLogRecData *rdata; XLogRecData *rdata;
...@@ -804,35 +835,37 @@ formUpdateRdata(RelFileNode node, Buffer buffer, ...@@ -804,35 +835,37 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
int cur, int cur,
i; i;
/* ugly wart in API: emptypage causes us to ignore other inputs */ rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
if (emptypage)
ntodelete = ituplen = 0;
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node; xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete; xlrec->ntodelete = ntodelete;
xlrec->isemptypage = emptypage;
if (key) if (key)
xlrec->key = *key; xlrec->key = *key;
else else
ItemPointerSetInvalid(&(xlrec->key)); ItemPointerSetInvalid(&(xlrec->key));
rdata[0].data = (char *) xlrec; rdata[0].buffer = buffer;
rdata[0].len = sizeof(gistxlogPageUpdate); rdata[0].buffer_std = true;
rdata[0].buffer = InvalidBuffer; rdata[0].data = NULL;
rdata[0].len = 0;
rdata[0].next = &(rdata[1]); rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) todelete; rdata[1].data = (char *) xlrec;
rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); rdata[1].len = sizeof(gistxlogPageUpdate);
rdata[1].buffer = buffer; rdata[1].buffer = InvalidBuffer;
rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]);
rdata[1].next = NULL;
rdata[2].data = (char *) todelete;
rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
rdata[2].buffer = buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
/* new tuples */ /* new tuples */
cur = 2; cur = 3;
for (i = 0; i < ituplen; i++) for (i = 0; i < ituplen; i++)
{ {
rdata[cur - 1].next = &(rdata[cur]); rdata[cur - 1].next = &(rdata[cur]);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.14 2006/05/17 16:34:59 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -85,20 +85,21 @@ extern const XLogRecPtr XLogRecPtrForTemp; ...@@ -85,20 +85,21 @@ extern const XLogRecPtr XLogRecPtrForTemp;
#define XLOG_GIST_PAGE_SPLIT 0x30 #define XLOG_GIST_PAGE_SPLIT 0x30
#define XLOG_GIST_INSERT_COMPLETE 0x40 #define XLOG_GIST_INSERT_COMPLETE 0x40
#define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_CREATE_INDEX 0x50
#define XLOG_GIST_PAGE_DELETE 0x60
typedef struct gistxlogPageUpdate typedef struct gistxlogPageUpdate
{ {
RelFileNode node; RelFileNode node;
BlockNumber blkno; BlockNumber blkno;
uint16 ntodelete;
bool isemptypage;
/* /*
* It used to identify completeness of insert. Sets to leaf itup * It used to identify completeness of insert. Sets to leaf itup
*/ */
ItemPointerData key; ItemPointerData key;
/* number of deleted offsets */
uint16 ntodelete;
/* /*
* follow: 1. todelete OffsetNumbers 2. tuples to insert * follow: 1. todelete OffsetNumbers 2. tuples to insert
*/ */
...@@ -131,6 +132,11 @@ typedef struct gistxlogInsertComplete ...@@ -131,6 +132,11 @@ typedef struct gistxlogInsertComplete
/* follows ItemPointerData key to clean */ /* follows ItemPointerData key to clean */
} gistxlogInsertComplete; } gistxlogInsertComplete;
typedef struct gistxlogPageDelete
{
RelFileNode node;
BlockNumber blkno;
} gistxlogPageDelete;
/* SplitedPageLayout - gistSplit function result */ /* SplitedPageLayout - gistSplit function result */
typedef struct SplitedPageLayout typedef struct SplitedPageLayout
...@@ -249,7 +255,7 @@ extern void gist_xlog_cleanup(void); ...@@ -249,7 +255,7 @@ extern void gist_xlog_cleanup(void);
extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete, bool emptypage, OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen, ItemPointer key); IndexTuple *itup, int ituplen, ItemPointer key);
extern XLogRecData *formSplitRdata(RelFileNode node, extern XLogRecData *formSplitRdata(RelFileNode node,
...@@ -273,7 +279,7 @@ extern void gistcheckpage(Relation rel, Buffer buf); ...@@ -273,7 +279,7 @@ extern void gistcheckpage(Relation rel, Buffer buf);
extern Buffer gistNewBuffer(Relation r); extern Buffer gistNewBuffer(Relation r);
extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off); int len, OffsetNumber off);
extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ ); extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
extern IndexTuple *gistjoinvector( extern IndexTuple *gistjoinvector(
IndexTuple *itvec, int *len, IndexTuple *itvec, int *len,
IndexTuple *additvec, int addlen); IndexTuple *additvec, int addlen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册