diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 89ba07135548610abae6b7da1dbb8e3689e20649..02c2ffefa2388cc3d335ac0ae0d76d8bb5d88240 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.121 2005/06/20 15:22:37 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.122 2005/06/27 12:45:21 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -23,6 +23,8 @@ #include "miscadmin.h" #include "utils/memutils.h" +const XLogRecPtr XLogRecPtrForTemp = { 1, 1 }; + /* Working state for gistbuild and its callback */ typedef struct { @@ -101,7 +103,7 @@ gistbuild(PG_FUNCTION_ARGS) initGISTstate(&buildstate.giststate, index); /* initialize the root page */ - buffer = gistReadBuffer(index, P_NEW); + buffer = gistNewBuffer(index); GISTInitBuffer(buffer, F_LEAF); if ( !index->rd_istemp ) { XLogRecPtr recptr; @@ -122,7 +124,9 @@ gistbuild(PG_FUNCTION_ARGS) PageSetTLI(page, ThisTimeLineID); END_CRIT_SECTION(); - } + } else + PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp); + LockBuffer(buffer, GIST_UNLOCK); WriteBuffer(buffer); /* build the index */ @@ -228,12 +232,6 @@ gistinsert(PG_FUNCTION_ARGS) MemoryContext oldCtx; MemoryContext insertCtx; - /* - * Since GIST is not marked "amconcurrent" in pg_am, caller should - * have acquired exclusive lock on index relation. We need no locking - * here. - */ - /* GiST cannot index tuples with leading NULLs */ if (isnull[0]) PG_RETURN_BOOL(false); @@ -290,8 +288,7 @@ gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) state.key = itup->t_tid; state.needInsertComplete = true; - state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack)); - memset( state.stack, 0, sizeof(GISTInsertStack)); + state.stack = (GISTInsertStack*)palloc0(sizeof(GISTInsertStack)); state.stack->blkno=GIST_ROOT_BLKNO; gistfindleaf(&state, giststate); @@ -301,7 +298,19 @@ gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) static bool gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { bool is_splitted = false; + bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; + + if ( !is_leaf ) + /* + * This node's key has been modified, either because a child + * split occurred or because we needed to adjust our key for + * an insert in a child node. Therefore, remove the old + * version of this node's key. + */ + + PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); + if (gistnospace(state->stack->page, state->itup, state->ituplen)) { /* no space for insertion */ @@ -321,7 +330,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { XLogRecData *rdata; rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, - &(state->key), state->path, state->pathlen, dist); + &(state->key), dist); START_CRIT_SECTION(); @@ -334,47 +343,106 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { } END_CRIT_SECTION(); - } - - ptr = dist; - while(ptr) { - WriteBuffer(ptr->buffer); - ptr=ptr->next; + } else { + ptr = dist; + while(ptr) { + PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); + ptr=ptr->next; + } } state->itup = newitup; state->ituplen = tlen; /* now tlen >= 2 */ if ( state->stack->blkno == GIST_ROOT_BLKNO ) { - gistnewroot(state->r, state->itup, state->ituplen, &(state->key)); + gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); state->needInsertComplete=false; + ptr = dist; + while(ptr) { + Page page = (Page)BufferGetPage(ptr->buffer); + GistPageGetOpaque(page)->rightlink = ( ptr->next ) ? + ptr->next->block.blkno : InvalidBlockNumber; + LockBuffer( ptr->buffer, GIST_UNLOCK ); + WriteBuffer(ptr->buffer); + ptr=ptr->next; + } + } else { + Page page; + BlockNumber rightrightlink = InvalidBlockNumber; + SplitedPageLayout *ourpage=NULL; + GistNSN oldnsn; + GISTPageOpaque opaque; + + /* move origpage to first in chain */ + if ( dist->block.blkno != state->stack->blkno ) { + ptr = dist; + while(ptr->next) { + if ( ptr->next->block.blkno == state->stack->blkno ) { + ourpage = ptr->next; + ptr->next = ptr->next->next; + ourpage->next = dist; + dist = ourpage; + break; + } + ptr=ptr->next; + } + Assert( ourpage != NULL ); + } else + ourpage = dist; + + + /* now gets all needed data, and sets nsn's */ + page = (Page)BufferGetPage(ourpage->buffer); + opaque = GistPageGetOpaque(page); + rightrightlink = opaque->rightlink; + oldnsn = opaque->nsn; + opaque->nsn = PageGetLSN(page); + opaque->rightlink = ourpage->next->block.blkno; + + /* fills and write all new pages. + They isn't linked into tree yet */ + + ptr = ourpage->next; + while(ptr) { + page = (Page)BufferGetPage(ptr->buffer); + GistPageGetOpaque(page)->rightlink = ( ptr->next ) ? + ptr->next->block.blkno : rightrightlink; + /* only for last set oldnsn */ + GistPageGetOpaque(page)->nsn = ( ptr->next ) ? + opaque->nsn : oldnsn; + + LockBuffer(ptr->buffer, GIST_UNLOCK); + WriteBuffer(ptr->buffer); + ptr=ptr->next; + } } - ReleaseBuffer(state->stack->buffer); + WriteNoReleaseBuffer( state->stack->buffer ); } else { /* enough space */ - OffsetNumber off, l; - bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; + OffsetNumber l, off; + XLogRecPtr oldlsn; - off = (PageIsEmpty(state->stack->page)) ? - FirstOffsetNumber - : - OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page)); + off = ( PageIsEmpty(state->stack->page) ) ? + FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page)); + l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off); + oldlsn = PageGetLSN(state->stack->page); if ( !state->r->rd_istemp ) { OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ]; XLogRecPtr recptr; XLogRecData *rdata; - if ( state->stack->todelete ) { + if ( !is_leaf ) { + /*only on inner page we should delete previous version */ offs[0] = state->stack->childoffnum; noffs=1; } rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno, offs, noffs, false, state->itup, state->ituplen, - &(state->key), state->path, state->pathlen); + &(state->key)); START_CRIT_SECTION(); @@ -383,11 +451,16 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { PageSetTLI(state->stack->page, ThisTimeLineID); END_CRIT_SECTION(); - } + } else + PageSetLSN(state->stack->page, XLogRecPtrForTemp); if ( state->stack->blkno == GIST_ROOT_BLKNO ) state->needInsertComplete=false; - WriteBuffer(state->stack->buffer); + WriteNoReleaseBuffer(state->stack->buffer); + + if (!is_leaf) /* small optimization: inform scan ablout deleting... */ + gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, + state->stack->childoffnum, PageGetLSN(state->stack->page), oldlsn ); if (state->ituplen > 1) { /* previous is_splitted==true */ @@ -409,17 +482,42 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { return is_splitted; } +/* + * returns stack of pages, all pages in stack are pinned, and + * leaf is X-locked + */ + static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) { ItemId iid; - IndexTuple oldtup; - GISTInsertStack *ptr; + IndexTuple idxtuple; + GISTPageOpaque opaque; + + /* walk down, We don't lock page for a long time, but so + we should be ready to recheck path in a bad case... + We remember, that page->lsn should never be invalid. */ + while( true ) { + + if ( XLogRecPtrIsInvalid( state->stack->lsn ) ) + state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); + LockBuffer( state->stack->buffer, GIST_SHARE ); - /* walk down */ - while( true ) { - state->stack->buffer = gistReadBuffer(state->r, state->stack->blkno); state->stack->page = (Page) BufferGetPage(state->stack->buffer); + opaque = GistPageGetOpaque(state->stack->page); + + state->stack->lsn = PageGetLSN(state->stack->page); + Assert( state->r->rd_istemp || !XLogRecPtrIsInvalid( state->stack->lsn ) ); + + if ( state->stack->blkno != GIST_ROOT_BLKNO && + XLByteLT( state->stack->parent->lsn, opaque->nsn) ) { + /* caused split non-root page is detected, go up to parent to choose best child */ + LockBuffer( state->stack->buffer, GIST_UNLOCK ); + ReleaseBuffer( state->stack->buffer ); + state->stack = state->stack->parent; + continue; + } + if (!GistPageIsLeaf(state->stack->page)) { @@ -432,42 +530,236 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) * split, or the key in this node needs to be adjusted for the * newly inserted key below us. */ - GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack)); + GISTInsertStack *item=(GISTInsertStack*)palloc0(sizeof(GISTInsertStack)); state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); iid = PageGetItemId(state->stack->page, state->stack->childoffnum); - oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); - item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid); + item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); + LockBuffer( state->stack->buffer, GIST_UNLOCK ); + item->parent = state->stack; - item->todelete = false; + item->child = NULL; + if ( state->stack ) + state->stack->child = item; state->stack = item; - } else + } else { + /* be carefull, during unlock/lock page may be changed... */ + LockBuffer( state->stack->buffer, GIST_UNLOCK ); + LockBuffer( state->stack->buffer, GIST_EXCLUSIVE ); + state->stack->page = (Page) BufferGetPage(state->stack->buffer); + opaque = GistPageGetOpaque(state->stack->page); + + if ( state->stack->blkno == GIST_ROOT_BLKNO ) { + /* the only page can become inner instead of leaf is a root page, + so for root we should recheck it */ + if ( !GistPageIsLeaf(state->stack->page) ) { + /* very rarely situation: during unlock/lock index + with number of pages = 1 was increased */ + LockBuffer( state->stack->buffer, GIST_UNLOCK ); + continue; + } + /* we don't need to check root split, because checking + leaf/inner is enough to recognize split for root */ + + } else if ( XLByteLT( state->stack->parent->lsn, opaque->nsn) ) { + /* detecting split during unlock/lock, so we should + find better child on parent*/ + + /* forget buffer */ + LockBuffer( state->stack->buffer, GIST_UNLOCK ); + ReleaseBuffer( state->stack->buffer ); + + state->stack = state->stack->parent; + continue; + } + + state->stack->lsn = PageGetLSN( state->stack->page ); + + /* ok we found a leaf page and it X-locked */ break; + } } - /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */ + /* now state->stack->(page, buffer and blkno) points to leaf page */ +} - /* form state->path to work xlog */ - ptr = state->stack; - state->pathlen=1; - while( ptr ) { - state->pathlen++; - ptr=ptr->parent; - } - state->path=(BlockNumber*)palloc(MAXALIGN(sizeof(BlockNumber)*state->pathlen)); - ptr = state->stack; - state->pathlen=0; - while( ptr ) { - state->path[ state->pathlen ] = ptr->blkno; - state->pathlen++; - ptr=ptr->parent; +/* + * Should have the same interface as XLogReadBuffer + */ +static Buffer +gistReadAndLockBuffer( bool unused, Relation r, BlockNumber blkno ) { + Buffer buffer = ReadBuffer( r, blkno ); + LockBuffer( buffer, GIST_SHARE ); + return buffer; +} + +/* + * Traverse the tree to find path from root page, + * to prevent deadlocks, it should lock only one page simultaneously. + * Function uses in recovery and usial mode, so should work with different + * read functions (gistReadAndLockBuffer and XLogReadBuffer) + * returns from the begining of closest parent; + */ +GISTInsertStack* +gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(bool, Relation, BlockNumber) ) { + Page page; + Buffer buffer; + OffsetNumber i, maxoff; + ItemId iid; + IndexTuple idxtuple; + GISTInsertStack *top, *tail, *ptr; + BlockNumber blkno; + + top = tail = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) ); + top->blkno = GIST_ROOT_BLKNO; + + while( top && top->blkno != child ) { + buffer = myReadBuffer(false, r, top->blkno); /* buffer locked */ + page = (Page)BufferGetPage( buffer ); + Assert( !GistPageIsLeaf(page) ); + + top->lsn = PageGetLSN(page); + + if ( top->parent && XLByteLT( top->parent->lsn, GistPageGetOpaque(page)->nsn) && + GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */) { + /* page splited while we thinking of... */ + ptr = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) ); + ptr->blkno = GistPageGetOpaque(page)->rightlink; + ptr->childoffnum = InvalidOffsetNumber; + ptr->parent = top; + ptr->next = NULL; + tail->next = ptr; + tail = ptr; + } + + maxoff = PageGetMaxOffsetNumber(page); + + for(i = FirstOffsetNumber; i<= maxoff; i = OffsetNumberNext(i)) { + iid = PageGetItemId(page, i); + idxtuple = (IndexTuple) PageGetItem(page, iid); + blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); + if ( blkno == child ) { + OffsetNumber poff = InvalidOffsetNumber; + + /* make childs links */ + ptr = top; + while( ptr->parent ) { + /* set child link */ + ptr->parent->child = ptr; + /* move childoffnum.. */ + if ( ptr == top ) { + /*first iteration*/ + poff = ptr->parent->childoffnum; + ptr->parent->childoffnum = ptr->childoffnum; + } else { + OffsetNumber tmp = ptr->parent->childoffnum; + ptr->parent->childoffnum = poff; + poff = tmp; + } + ptr = ptr->parent; + } + top->childoffnum = i; + LockBuffer( buffer, GIST_UNLOCK ); + ReleaseBuffer( buffer ); + return top; + } else if ( GistPageGetOpaque(page)->level> 0 ) { + /* Install next inner page to the end of stack */ + ptr = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) ); + ptr->blkno = blkno; + ptr->childoffnum = i; /* set offsetnumber of child to child !!! */ + ptr->parent = top; + ptr->next = NULL; + tail->next = ptr; + tail = ptr; + } + } + + LockBuffer( buffer, GIST_UNLOCK ); + ReleaseBuffer( buffer ); + top = top->next; } - state->pathlen--; - state->path++; + + return NULL; } +/* + * Returns X-locked parent of stack page + */ + +static void +gistFindCorrectParent( Relation r, GISTInsertStack *child ) { + GISTInsertStack *parent = child->parent; + + LockBuffer( parent->buffer, GIST_EXCLUSIVE ); + parent->page = (Page)BufferGetPage( parent->buffer ); + + + /* here we don't need to distinguish between split and page update */ + if ( parent->childoffnum == InvalidOffsetNumber || !XLByteEQ( parent->lsn, PageGetLSN(parent->page) ) ) { + /* parent is changed, look child in right links until found */ + OffsetNumber i, maxoff; + ItemId iid; + IndexTuple idxtuple; + GISTInsertStack *ptr; + + while(true) { + maxoff = PageGetMaxOffsetNumber(parent->page); + for(i = FirstOffsetNumber; i<= maxoff; i = OffsetNumberNext(i)) { + iid = PageGetItemId(parent->page, i); + idxtuple = (IndexTuple) PageGetItem(parent->page, iid); + if ( ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno ) { + /* yes!!, found */ + parent->childoffnum = i; + return; + } + } + + parent->blkno = GistPageGetOpaque( parent->page )->rightlink; + LockBuffer( parent->buffer, GIST_UNLOCK ); + ReleaseBuffer( parent->buffer ); + if ( parent->blkno == InvalidBlockNumber ) + /* end of chain and still didn't found parent, + It's very-very rare situation when root splited */ + break; + parent->buffer = ReadBuffer( r, parent->blkno ); + LockBuffer( parent->buffer, GIST_EXCLUSIVE ); + parent->page = (Page)BufferGetPage( parent->buffer ); + } + + /* awful!!, we need search tree to find parent ... , + but before we should release all old parent */ + + ptr = child->parent->parent; /* child->parent already released above */ + while(ptr) { + ReleaseBuffer( ptr->buffer ); + ptr = ptr->parent; + } + + /* ok, find new path */ + ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer); + Assert( ptr!=NULL ); + + /* read all buffers as supposed in caller */ + while( ptr ) { + ptr->buffer = ReadBuffer( r, ptr->blkno ); + ptr->page = (Page)BufferGetPage( ptr->buffer ); + ptr = ptr->parent; + } + + /* install new chain of parents to stack */ + child->parent = parent; + parent->child = child; + + /* make recursive call to normal processing */ + gistFindCorrectParent( r, child ); + } + + return; +} + void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { int is_splitted; @@ -482,19 +774,25 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { * then itup contains additional for adjustment of current key */ + if ( state->stack->parent ) { + /* X-lock parent page before proceed child, + gistFindCorrectParent should find and lock it */ + gistFindCorrectParent( state->r, state->stack ); + } is_splitted = gistplacetopage(state, giststate); - /* pop page from stack */ + /* parent locked above, so release child buffer */ + LockBuffer(state->stack->buffer, GIST_UNLOCK ); + ReleaseBuffer( state->stack->buffer ); + + /* pop parent page from stack */ state->stack = state->stack->parent; - state->pathlen--; - state->path++; /* stack is void */ if ( ! state->stack ) break; - - /* child did not split */ + /* child did not split, so we can check is it needed to update parent tuple */ if (!is_splitted) { /* parent's tuple */ @@ -502,34 +800,16 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); - if (!newtup) /* not need to update key */ + if (!newtup) { /* not need to update key */ + LockBuffer( state->stack->buffer, GIST_UNLOCK ); break; + } state->itup[0] = newtup; - } - - /* - * This node's key has been modified, either because a child - * split occurred or because we needed to adjust our key for - * an insert in a child node. Therefore, remove the old - * version of this node's key. - */ - - gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum); - PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); - if ( !state->r->rd_istemp ) - state->stack->todelete = true; - - /* - * if child was splitted, new key for child will be inserted in - * the end list of child, so we must say to any scans that page is - * changed beginning from 'child' offset - */ - if (is_splitted) - gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum); + } } /* while */ - /* release all buffers */ + /* release all parent buffers */ while( state->stack ) { ReleaseBuffer(state->stack->buffer); state->stack = state->stack->parent; @@ -577,9 +857,11 @@ gistSplit(Relation r, OffsetNumber *realoffset; IndexTuple *cleaneditup = itup; int lencleaneditup = *len; + int level; p = (Page) BufferGetPage(buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(p); + opaque = GistPageGetOpaque(p); + level = opaque->level; /* * The root of the tree is the first block in the relation. If we're @@ -588,23 +870,25 @@ gistSplit(Relation r, */ if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO) { - leftbuf = gistReadBuffer(r, P_NEW); + leftbuf = gistNewBuffer(r); GISTInitBuffer(leftbuf, opaque->flags&F_LEAF); lbknum = BufferGetBlockNumber(leftbuf); left = (Page) BufferGetPage(leftbuf); + GistPageGetOpaque(left)->level = level; } else { leftbuf = buffer; - IncrBufferRefCount(buffer); + /* IncrBufferRefCount(buffer); */ lbknum = BufferGetBlockNumber(buffer); left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData)); } - rightbuf = gistReadBuffer(r, P_NEW); + rightbuf = gistNewBuffer(r); GISTInitBuffer(rightbuf, opaque->flags&F_LEAF); rbknum = BufferGetBlockNumber(rightbuf); right = (Page) BufferGetPage(rightbuf); + GistPageGetOpaque(right)->level = level; /* generate the item array */ realoffset = palloc((*len + 1) * sizeof(OffsetNumber)); @@ -711,7 +995,7 @@ gistSplit(Relation r, { nlen = v.spl_nright; newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); - ReleaseBuffer(rightbuf); + /* ReleaseBuffer(rightbuf); */ } else { @@ -745,7 +1029,7 @@ gistSplit(Relation r, IndexTuple *lntup; lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); - ReleaseBuffer(leftbuf); + /* ReleaseBuffer(leftbuf); */ newtup = gistjoinvector(newtup, &nlen, lntup, llen); } @@ -785,14 +1069,16 @@ gistSplit(Relation r, } void -gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key) +gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key) { - Buffer buffer; Page page; + int level; - buffer = gistReadBuffer(r, GIST_ROOT_BLKNO); - GISTInitBuffer(buffer, 0); + Assert( BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO ); page = BufferGetPage(buffer); + level = GistPageGetOpaque(page)->level; + GISTInitBuffer(buffer, 0); + GistPageGetOpaque(page)->level = level+1; gistfillbuffer(r, page, itup, len, FirstOffsetNumber); if ( !r->rd_istemp ) { @@ -800,8 +1086,7 @@ gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key) XLogRecData *rdata; rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO, - NULL, 0, false, itup, len, - key, NULL, 0); + NULL, 0, false, itup, len, key); START_CRIT_SECTION(); @@ -810,8 +1095,8 @@ gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key) PageSetTLI(page, ThisTimeLineID); END_CRIT_SECTION(); - } - WriteBuffer(buffer); + } else + PageSetLSN(page, XLogRecPtrForTemp); } void diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 4bce9962f3a3962afbaf562e2dbd1b0ec1f8070e..823defa3a60fbbdd5dcd0c112beea7308df32e3e 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.50 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -21,10 +21,63 @@ static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir); -static bool gistnext(IndexScanDesc scan, ScanDirection dir); +static int gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples); static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan, OffsetNumber offset); +static void +killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) { + Buffer buffer = so->curbuf; + + for(;;) { + Page p; + BlockNumber blkno; + OffsetNumber offset, maxoff; + + LockBuffer( buffer, GIST_SHARE ); + p = (Page)BufferGetPage( buffer ); + + if ( buffer == so->curbuf && XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) { + /* page unchanged, so all is simple */ + offset = ItemPointerGetOffsetNumber(iptr); + PageGetItemId(p, offset)->lp_flags |= LP_DELETE; + SetBufferCommitInfoNeedsSave(buffer); + LockBuffer( buffer, GIST_UNLOCK ); + break; + } + + maxoff = PageGetMaxOffsetNumber( p ); + + for(offset = FirstOffsetNumber; offset<= maxoff; offset = OffsetNumberNext(offset)) { + IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset)); + + if ( ItemPointerEquals( &(ituple->t_tid), iptr ) ) { + /* found */ + PageGetItemId(p, offset)->lp_flags |= LP_DELETE; + SetBufferCommitInfoNeedsSave(buffer); + LockBuffer( buffer, GIST_UNLOCK ); + if ( buffer != so->curbuf ) + ReleaseBuffer( buffer ); + return; + } + } + + /* follow right link */ + /* + * ??? is it good? if tuple dropped by concurrent vacuum, + * we will read all leaf pages... + */ + blkno = GistPageGetOpaque(p)->rightlink; + LockBuffer( buffer, GIST_UNLOCK ); + if ( buffer != so->curbuf ) + ReleaseBuffer( buffer ); + + if ( blkno==InvalidBlockNumber ) + /* can't found, dropped by somebody else */ + return; + buffer = ReadBuffer( r, blkno ); + } +} /* * gistgettuple() -- Get the next tuple in the scan @@ -34,48 +87,27 @@ gistgettuple(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1); - Page page; - OffsetNumber offnum; GISTScanOpaque so; + ItemPointerData tid; + bool res; so = (GISTScanOpaque) scan->opaque; /* * If we have produced an index tuple in the past and the executor * has informed us we need to mark it as "killed", do so now. - * - * XXX: right now there is no concurrent access. In the - * future, we should (a) get a read lock on the page (b) check - * that the location of the previously-fetched tuple hasn't - * changed due to concurrent insertions. */ - if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData))) - { - offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData)); - page = BufferGetPage(so->curbuf); - PageGetItemId(page, offnum)->lp_flags |= LP_DELETE; - SetBufferCommitInfoNeedsSave(so->curbuf); - } + if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData))) + killtuple(scan->indexRelation, so, &(scan->currentItemData)); /* * Get the next tuple that matches the search key. If asked to * skip killed tuples, continue looping until we find a non-killed * tuple that matches the search key. */ - for (;;) - { - bool res = gistnext(scan, dir); - - if (res == true && scan->ignore_killed_tuples) - { - offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData)); - page = BufferGetPage(so->curbuf); - if (ItemIdDeleted(PageGetItemId(page, offnum))) - continue; - } + res = ( gistnext(scan, dir, &tid, 1, scan->ignore_killed_tuples) ) ? true : false; - PG_RETURN_BOOL(res); - } + PG_RETURN_BOOL(res); } Datum @@ -85,36 +117,28 @@ gistgetmulti(PG_FUNCTION_ARGS) ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1); int32 max_tids = PG_GETARG_INT32(2); int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3); - bool res = true; - int32 ntids = 0; - /* XXX generic implementation: loop around guts of gistgettuple */ - while (ntids < max_tids) - { - res = gistnext(scan, ForwardScanDirection); - if (!res) - break; - tids[ntids] = scan->xs_ctup.t_self; - ntids++; - } - - *returned_tids = ntids; - PG_RETURN_BOOL(res); + *returned_tids = gistnext(scan, ForwardScanDirection, tids, max_tids, false); + + PG_RETURN_BOOL(*returned_tids == max_tids); } /* - * Fetch a tuple that matchs the search key; this can be invoked + * Fetch a tuples that matchs the search key; this can be invoked * either to fetch the first such tuple or subsequent matching * tuples. Returns true iff a matching tuple was found. */ -static bool -gistnext(IndexScanDesc scan, ScanDirection dir) +static int +gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples) { Page p; OffsetNumber n; GISTScanOpaque so; - GISTSTACK *stk; + GISTSearchStack *stk; IndexTuple it; + GISTPageOpaque opaque; + bool resetoffset=false; + int ntids=0; so = (GISTScanOpaque) scan->opaque; @@ -122,107 +146,164 @@ gistnext(IndexScanDesc scan, ScanDirection dir) { /* Being asked to fetch the first entry, so start at the root */ Assert(so->curbuf == InvalidBuffer); - so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO); - } + Assert(so->stack == NULL); - p = BufferGetPage(so->curbuf); + so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO); + + stk = so->stack = (GISTSearchStack*) palloc0( sizeof(GISTSearchStack) ); - if (ItemPointerIsValid(&scan->currentItemData) == false) - { - if (ScanDirectionIsBackward(dir)) - n = PageGetMaxOffsetNumber(p); - else - n = FirstOffsetNumber; + stk->next = NULL; + stk->block = GIST_ROOT_BLKNO; + } else if ( so->curbuf == InvalidBuffer ) { + return 0; } - else - { - n = ItemPointerGetOffsetNumber(&(scan->currentItemData)); - if (ScanDirectionIsBackward(dir)) - n = OffsetNumberPrev(n); - else - n = OffsetNumberNext(n); - } + for(;;) { + /* First of all, we need lock buffer */ + Assert( so->curbuf != InvalidBuffer ); + LockBuffer( so->curbuf, GIST_SHARE ); + p = BufferGetPage(so->curbuf); + opaque = GistPageGetOpaque( p ); + resetoffset = false; + + if ( XLogRecPtrIsInvalid( so->stack->lsn ) || !XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) { + /* page changed from last visit or visit first time , reset offset */ + so->stack->lsn = PageGetLSN(p); + resetoffset = true; + + /* check page split, occured from last visit or visit to parent */ + if ( !XLogRecPtrIsInvalid( so->stack->parentlsn ) && + XLByteLT( so->stack->parentlsn, opaque->nsn ) && + opaque->rightlink != InvalidBlockNumber /* sanity check */ && + (so->stack->next==NULL || so->stack->next->block != opaque->rightlink) /* check if already added */) { + /* detect page split, follow right link to add pages */ + + stk = (GISTSearchStack*) palloc( sizeof(GISTSearchStack) ); + stk->next = so->stack->next; + stk->block = opaque->rightlink; + stk->parentlsn = so->stack->parentlsn; + memset( &(stk->lsn), 0, sizeof(GistNSN) ); + so->stack->next = stk; + } + } - for (;;) - { - n = gistfindnext(scan, n, dir); + /* if page is empty, then just skip it */ + if ( PageIsEmpty(p) ) { + LockBuffer( so->curbuf, GIST_UNLOCK ); + stk = so->stack->next; + pfree( so->stack ); + so->stack = stk; - if (!OffsetNumberIsValid(n)) - { - /* - * We ran out of matching index entries on the current - * page, so pop the top stack entry and use it to continue - * the search. - */ - /* If we're out of stack entries, we're done */ - if (so->stack == NULL) - { + if (so->stack == NULL) { ReleaseBuffer(so->curbuf); so->curbuf = InvalidBuffer; - return false; + return ntids; } - stk = so->stack; so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, - stk->block); - p = BufferGetPage(so->curbuf); + stk->block); + continue; + } + if (!GistPageIsLeaf(p) || resetoffset || ItemPointerIsValid(&scan->currentItemData) == false) + { if (ScanDirectionIsBackward(dir)) - n = OffsetNumberPrev(stk->offset); + n = PageGetMaxOffsetNumber(p); else - n = OffsetNumberNext(stk->offset); - - so->stack = stk->parent; - pfree(stk); - - continue; + n = FirstOffsetNumber; + } + else + { + n = ItemPointerGetOffsetNumber(&(scan->currentItemData)); + + if (ScanDirectionIsBackward(dir)) + n = OffsetNumberPrev(n); + else + n = OffsetNumberNext(n); } - if (GistPageIsLeaf(p)) + /* wonderfull, we can look at page */ + + for(;;) { - /* - * We've found a matching index entry in a leaf page, so - * return success. Note that we keep "curbuf" pinned so - * that we can efficiently resume the index scan later. - */ - ItemPointerSet(&(scan->currentItemData), + n = gistfindnext(scan, n, dir); + + if (!OffsetNumberIsValid(n)) + { + /* + * We ran out of matching index entries on the current + * page, so pop the top stack entry and use it to continue + * the search. + */ + LockBuffer( so->curbuf, GIST_UNLOCK ); + stk = so->stack->next; + pfree( so->stack ); + so->stack = stk; + + /* If we're out of stack entries, we're done */ + + if (so->stack == NULL) + { + ReleaseBuffer(so->curbuf); + so->curbuf = InvalidBuffer; + return ntids; + } + + so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, + stk->block); + /* XXX go up */ + break; + } + + if (GistPageIsLeaf(p)) + { + /* + * We've found a matching index entry in a leaf page, so + * return success. Note that we keep "curbuf" pinned so + * that we can efficiently resume the index scan later. + */ + + ItemPointerSet(&(scan->currentItemData), BufferGetBlockNumber(so->curbuf), n); - it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - scan->xs_ctup.t_self = it->t_tid; - return true; - } - else - { - /* - * We've found an entry in an internal node whose key is - * consistent with the search key, so continue the search - * in the pointed-to child node (i.e. we search depth - * first). Push the current node onto the stack so we - * resume searching from this node later. - */ - BlockNumber child_block; - - stk = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - stk->offset = n; - stk->block = BufferGetBlockNumber(so->curbuf); - stk->parent = so->stack; - so->stack = stk; + if ( ! ( ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n)) ) ) { + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + tids[ntids] = scan->xs_ctup.t_self = it->t_tid; + ntids++; + + if ( ntids == maxtids ) { + LockBuffer( so->curbuf, GIST_UNLOCK ); + return ntids; + } + } + } + else + { + /* + * We've found an entry in an internal node whose key is + * consistent with the search key, so push it to stack + */ - it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - child_block = ItemPointerGetBlockNumber(&(it->t_tid)); + stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack)); - so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, - child_block); - p = BufferGetPage(so->curbuf); + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + stk->block = ItemPointerGetBlockNumber(&(it->t_tid)); + memset( &(stk->lsn), 0, sizeof(GistNSN) ); + stk->parentlsn = so->stack->lsn; + + stk->next = so->stack->next; + so->stack->next = stk; + + } if (ScanDirectionIsBackward(dir)) - n = PageGetMaxOffsetNumber(p); + n = OffsetNumberPrev(n); else - n = FirstOffsetNumber; + n = OffsetNumberNext(n); } } + + return ntids; } /* @@ -313,6 +394,7 @@ gistindex_keytest(IndexTuple tuple, * Return the offset of the first index entry that is consistent with * the search key after offset 'n' in the current page. If there are * no more consistent entries, return InvalidOffsetNumber. + * Page should be locked.... */ static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir) diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c index 34a8de1829059b972ccba62bbe9885ea4f9d316c..933ca0f6c651fb10748ecf8bcf6f6d8f14a8814f 100644 --- a/src/backend/access/gist/gistscan.c +++ b/src/backend/access/gist/gistscan.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.58 2005/05/17 03:34:18 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.59 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -24,11 +24,10 @@ static void gistregscan(IndexScanDesc scan); static void gistdropscan(IndexScanDesc scan); static void gistadjone(IndexScanDesc scan, int op, BlockNumber blkno, - OffsetNumber offnum); -static void adjuststack(GISTSTACK *stk, BlockNumber blkno); -static void adjustiptr(IndexScanDesc scan, ItemPointer iptr, - int op, BlockNumber blkno, OffsetNumber offnum); -static void gistfreestack(GISTSTACK *s); + OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn); +static void adjustiptr(IndexScanDesc scan, ItemPointer iptr, GISTSearchStack *stk, + int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn); +static void gistfreestack(GISTSearchStack *s); /* * Whenever we start a GiST scan in a backend, we register it in @@ -139,7 +138,7 @@ gistmarkpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); GISTScanOpaque so; - GISTSTACK *o, + GISTSearchStack *o, *n, *tmp; @@ -156,12 +155,13 @@ gistmarkpos(PG_FUNCTION_ARGS) /* copy the parent stack from the current item data */ while (n != NULL) { - tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - tmp->offset = n->offset; + tmp = (GISTSearchStack *) palloc(sizeof(GISTSearchStack)); + tmp->lsn = n->lsn; + tmp->parentlsn = n->parentlsn; tmp->block = n->block; - tmp->parent = o; + tmp->next = o; o = tmp; - n = n->parent; + n = n->next; } gistfreestack(so->markstk); @@ -187,7 +187,7 @@ gistrestrpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); GISTScanOpaque so; - GISTSTACK *o, + GISTSearchStack *o, *n, *tmp; @@ -204,12 +204,13 @@ gistrestrpos(PG_FUNCTION_ARGS) /* copy the parent stack from the current item data */ while (n != NULL) { - tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - tmp->offset = n->offset; + tmp = (GISTSearchStack *) palloc(sizeof(GISTSearchStack)); + tmp->lsn = n->lsn; + tmp->parentlsn = n->parentlsn; tmp->block = n->block; - tmp->parent = o; + tmp->next = o; o = tmp; - n = n->parent; + n = n->next; } gistfreestack(so->stack); @@ -253,6 +254,7 @@ gistendscan(PG_FUNCTION_ARGS) pfree(scan->opaque); } + gistdropscan(scan); PG_RETURN_VOID(); @@ -331,16 +333,19 @@ ReleaseResources_gist(void) } void -gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum) +gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn) { GISTScanList l; Oid relid; + if ( XLogRecPtrIsInvalid(newlsn) || XLogRecPtrIsInvalid(oldlsn) ) + return; + relid = RelationGetRelid(rel); for (l = GISTScans; l != NULL; l = l->gsl_next) { if (l->gsl_scan->indexRelation->rd_id == relid) - gistadjone(l->gsl_scan, op, blkno, offnum); + gistadjone(l->gsl_scan, op, blkno, offnum, newlsn, oldlsn); } } @@ -358,20 +363,12 @@ static void gistadjone(IndexScanDesc scan, int op, BlockNumber blkno, - OffsetNumber offnum) + OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn) { - GISTScanOpaque so; - - adjustiptr(scan, &(scan->currentItemData), op, blkno, offnum); - adjustiptr(scan, &(scan->currentMarkData), op, blkno, offnum); - - so = (GISTScanOpaque) scan->opaque; + GISTScanOpaque so = (GISTScanOpaque) scan->opaque ; - if (op == GISTOP_SPLIT) - { - adjuststack(so->stack, blkno); - adjuststack(so->markstk, blkno); - } + adjustiptr(scan, &(scan->currentItemData), so->stack, op, blkno, offnum, newlsn, oldlsn); + adjustiptr(scan, &(scan->currentMarkData), so->markstk, op, blkno, offnum, newlsn, oldlsn); } /* @@ -383,10 +380,10 @@ gistadjone(IndexScanDesc scan, */ static void adjustiptr(IndexScanDesc scan, - ItemPointer iptr, + ItemPointer iptr, GISTSearchStack *stk, int op, BlockNumber blkno, - OffsetNumber offnum) + OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn) { OffsetNumber curoff; GISTScanOpaque so; @@ -402,7 +399,7 @@ adjustiptr(IndexScanDesc scan, { case GISTOP_DEL: /* back up one if we need to */ - if (curoff >= offnum) + if (curoff >= offnum && XLByteEQ(stk->lsn, oldlsn) ) /* the same vesrion of page */ { if (curoff > FirstOffsetNumber) { @@ -421,18 +418,9 @@ adjustiptr(IndexScanDesc scan, else so->flags |= GS_MRKBEFORE; } + stk->lsn = newlsn; } break; - - case GISTOP_SPLIT: - /* back to start of page on split */ - ItemPointerSet(iptr, blkno, FirstOffsetNumber); - if (iptr == &(scan->currentItemData)) - so->flags &= ~GS_CURBEFORE; - else - so->flags &= ~GS_MRKBEFORE; - break; - default: elog(ERROR, "Bad operation in GiST scan adjust: %d", op); } @@ -440,37 +428,12 @@ adjustiptr(IndexScanDesc scan, } } -/* - * adjuststack() -- adjust the supplied stack for a split on a page in - * the index we're scanning. - * - * If a page on our parent stack has split, we need to back up to the - * beginning of the page and rescan it. The reason for this is that - * the split algorithm for GiSTs doesn't order tuples in any useful - * way on a single page. This means on that a split, we may wind up - * looking at some heap tuples more than once. This is handled in the - * access method update code for heaps; if we've modified the tuple we - * are looking at already in this transaction, we ignore the update - * request. - */ -static void -adjuststack(GISTSTACK *stk, BlockNumber blkno) -{ - while (stk != NULL) - { - if (stk->block == blkno) - stk->offset = FirstOffsetNumber; - - stk = stk->parent; - } -} - static void -gistfreestack(GISTSTACK *s) +gistfreestack(GISTSearchStack *s) { while (s != NULL) { - GISTSTACK *p = s->parent; + GISTSearchStack *p = s->next; pfree(s); s = p; } diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index 735be85f25702203b700f707031baa4af593e6d8..e7c985b45956e665e1c0e93559fbe365d8c46aab 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.2 2005/06/20 10:29:36 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.3 2005/06/27 12:45:22 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -803,8 +803,12 @@ GISTInitBuffer(Buffer b, uint32 f) page = BufferGetPage(b); PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + opaque = GistPageGetOpaque(page); opaque->flags = f; + opaque->nsplited = 0; + opaque->level = 0; + opaque->rightlink = InvalidBlockNumber; + memset( &(opaque->nsn), 0, sizeof(GistNSN) ); } void @@ -856,30 +860,38 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v, } Buffer -gistReadBuffer(Relation r, BlockNumber blkno) { +gistNewBuffer(Relation r) { Buffer buffer = InvalidBuffer; + bool needLock; - if ( blkno != P_NEW ) { - buffer = ReadBuffer(r, blkno); - } else { - Page page; - - while(true) { - blkno = GetFreeIndexPage(&r->rd_node); - if (blkno == InvalidBlockNumber) - break; + while(true) { + BlockNumber blkno = GetFreeIndexPage(&r->rd_node); + if (blkno == InvalidBlockNumber) + break; - buffer = ReadBuffer(r, blkno); - page = BufferGetPage(buffer); + buffer = ReadBuffer(r, blkno); + if ( ConditionalLockBuffer(buffer) ) { + Page page = BufferGetPage(buffer); if ( GistPageIsDeleted( page ) ) { GistPageSetNonDeleted( page ); return buffer; - } - ReleaseBuffer( buffer ); + } else + LockBuffer(buffer, GIST_UNLOCK); } - buffer = ReadBuffer(r, P_NEW); + ReleaseBuffer( buffer ); } - + + needLock = !RELATION_IS_LOCAL(r); + + if (needLock) + LockRelationForExtension(r, ExclusiveLock); + + buffer = ReadBuffer(r, P_NEW); + LockBuffer(buffer, GIST_EXCLUSIVE); + + if (needLock) + UnlockRelationForExtension(r, ExclusiveLock); + return buffer; } diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index e462d2af596bbef55e0a0b3d8c1f7fe3b2770708..c1806025bb38e6505a8fdc32e546b2431e20aecf 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.2 2005/06/20 15:22:37 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.3 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -34,43 +34,14 @@ typedef struct { Relation index; MemoryContext opCtx; IndexBulkDeleteResult *result; - - /* path to root */ - BlockNumber *path; - int pathlen; - int curpathlen; } GistVacuum; -static void -shiftPath(GistVacuum *gv, BlockNumber blkno) { - if ( gv->pathlen == 0 ) { - gv->pathlen = 8; - gv->path = (BlockNumber*) palloc( MAXALIGN(sizeof(BlockNumber)*gv->pathlen) ); - } else if ( gv->pathlen == gv->curpathlen ) { - gv->pathlen *= 2; - gv->path = (BlockNumber*) repalloc( gv->path, MAXALIGN(sizeof(BlockNumber)*gv->pathlen) ); - } - - if ( gv->curpathlen ) - memmove( gv->path+1, gv->path, sizeof(BlockNumber)*gv->curpathlen ); - gv->curpathlen++; - gv->path[0] = blkno; -} - -static void -unshiftPath(GistVacuum *gv) { - gv->curpathlen--; - if ( gv->curpathlen ) - memmove( gv->path, gv->path+1, sizeof(BlockNumber)*gv->curpathlen ); -} - typedef struct { IndexTuple *itup; int ituplen; bool emptypage; } ArrayTuple; - static ArrayTuple gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { ArrayTuple res = {NULL, 0, false}; @@ -100,7 +71,6 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { completed = (ItemPointerData*)palloc( sizeof(ItemPointerData)*lencompleted ); addon=(IndexTuple*)palloc(sizeof(IndexTuple)*lenaddon); - shiftPath(gv, blkno); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { ArrayTuple chldtuple; bool needchildunion; @@ -115,8 +85,6 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { chldtuple = gistVacuumUpdate( gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)), needchildunion ); if ( chldtuple.ituplen || chldtuple.emptypage ) { - /* adjust any scans that will be affected by this deletion */ - gistadjscans(gv->index, GISTOP_DEL, blkno, i); PageIndexTupleDelete(page, i); todelete[ ntodelete++ ] = i; i--; maxoff--; @@ -180,10 +148,8 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { oldCtx = MemoryContextSwitchTo(gv->opCtx); - /* path is need to recovery because there is new pages, in a case of - crash it's needed to add inner tuple pointers on parent page */ rdata = formSplitRdata(gv->index->rd_node, blkno, - &key, gv->path, gv->curpathlen, dist); + &key, dist); MemoryContextSwitchTo(oldCtx); @@ -198,11 +164,18 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { } END_CRIT_SECTION(); - + } else { + ptr = dist; + while(ptr) { + PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp); + ptr=ptr->next; + } } ptr = dist; while(ptr) { + if ( BufferGetBlockNumber(ptr->buffer) != blkno ) + LockBuffer( ptr->buffer, GIST_UNLOCK ); WriteBuffer(ptr->buffer); ptr=ptr->next; } @@ -213,8 +186,10 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { ItemPointerSet(&key, blkno, TUPLE_IS_VALID); oldCtx = MemoryContextSwitchTo(gv->opCtx); - gistnewroot(gv->index, res.itup, res.ituplen, &key); + gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key); MemoryContextSwitchTo(oldCtx); + + WriteNoReleaseBuffer(buffer); } needwrite=false; @@ -223,16 +198,15 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { needunion = false; /* gistSplit already forms unions */ } else { + /* enough free space */ OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); - /* enough free space */ gistfillbuffer(gv->index, page, addon, curlenaddon, off); } } - unshiftPath(gv); } if ( needunion ) { @@ -272,22 +246,22 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) { if ( !gv->index->rd_istemp ) { XLogRecData *rdata; XLogRecPtr recptr; - MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx); + char *xlinfo; - /* In a vacuum, it's not need to push path, because - there is no new inserted keys */ rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete, - res.emptypage, addon, curlenaddon, NULL, NULL, 0); - MemoryContextSwitchTo(oldCtx); - + res.emptypage, addon, curlenaddon, NULL ); + xlinfo = rdata->data; START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); END_CRIT_SECTION(); - MemoryContextReset(gv->opCtx); - } + + pfree( xlinfo ); + pfree( rdata ); + } else + PageSetLSN(page, XLogRecPtrForTemp); WriteBuffer( buffer ); } else ReleaseBuffer( buffer ); @@ -318,22 +292,20 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) { BlockNumber npages, blkno; BlockNumber nFreePages, *freePages, maxFreePages; BlockNumber lastBlock = GIST_ROOT_BLKNO, lastFilledBlock = GIST_ROOT_BLKNO; - - /* LockRelation(rel, AccessExclusiveLock); */ + bool needLock; /* gistVacuumUpdate may cause hard work */ if ( info->vacuum_full ) { GistVacuum gv; ArrayTuple res; + LockRelation(rel, AccessExclusiveLock); + gv.index = rel; initGISTstate(&(gv.giststate), rel); gv.opCtx = createTempGistContext(); gv.result = stats; - gv.path=NULL; - gv.pathlen = gv.curpathlen = 0; - /* walk through the entire index for update tuples */ res = gistVacuumUpdate( &gv, GIST_ROOT_BLKNO, false ); /* cleanup */ @@ -343,8 +315,6 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) { pfree( res.itup[i] ); pfree( res.itup ); } - if ( gv.path ) - pfree( gv.path ); freeGISTstate(&(gv.giststate)); MemoryContextDelete(gv.opCtx); } else if (needFullVacuum) { @@ -354,16 +324,29 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) { needFullVacuum = false; + needLock = !RELATION_IS_LOCAL(rel); + if ( info->vacuum_full ) + needLock = false; /* relation locked with AccessExclusiveLock */ + /* try to find deleted pages */ + if (needLock) + LockRelationForExtension(rel, ExclusiveLock); npages = RelationGetNumberOfBlocks(rel); - maxFreePages = RelationGetNumberOfBlocks(rel); + if (needLock) + UnlockRelationForExtension(rel, ExclusiveLock); + + maxFreePages = npages; if ( maxFreePages > MaxFSMPages ) maxFreePages = MaxFSMPages; + nFreePages = 0; freePages = (BlockNumber*) palloc (sizeof(BlockNumber) * maxFreePages); for(blkno=GIST_ROOT_BLKNO+1;blkno 0 ) { - if ( info->vacuum_full ) { /* try to truncate index */ - int i; - for(i=0;i= lastFilledBlock ) { - nFreePages = i; - break; - } + if ( info->vacuum_full && nFreePages>0 ) { /* try to truncate index */ + int i; + for(i=0;i= lastFilledBlock ) { + nFreePages = i; + break; + } - if ( lastBlock > lastFilledBlock ) - RelationTruncate( rel, lastFilledBlock+1 ); - stats->pages_removed = lastBlock - lastFilledBlock; - } - - if ( nFreePages > 0 ) - RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages ); + if ( lastBlock > lastFilledBlock ) + RelationTruncate( rel, lastFilledBlock+1 ); + stats->pages_removed = lastBlock - lastFilledBlock; } + + RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages ); pfree( freePages ); /* return statistics */ stats->pages_free = nFreePages; + if (needLock) + LockRelationForExtension(rel, ExclusiveLock); stats->num_pages = RelationGetNumberOfBlocks(rel); + if (needLock) + UnlockRelationForExtension(rel, ExclusiveLock); - /* UnlockRelation(rel, AccessExclusiveLock); */ + if (info->vacuum_full) + UnlockRelation(rel, AccessExclusiveLock); PG_RETURN_POINTER(stats); } typedef struct GistBDItem { + GistNSN parentlsn; BlockNumber blkno; struct GistBDItem *next; } GistBDItem; +static void +pushStackIfSplited(Page page, GistBDItem *stack) { + GISTPageOpaque opaque = GistPageGetOpaque(page); + + if ( stack->blkno!=GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid( stack->parentlsn ) && + XLByteLT( stack->parentlsn, opaque->nsn) && + opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { + /* split page detected, install right link to the stack */ + + GistBDItem *ptr = (GistBDItem*) palloc(sizeof(GistBDItem)); + ptr->blkno = opaque->rightlink; + ptr->parentlsn = stack->parentlsn; + ptr->next = stack->next; + stack->next = ptr; + } +} + + /* * Bulk deletion of all index entries pointing to a set of heap tuples and - * update invalid tuples after crash recovery. + * check invalid tuples after crash recovery. * The set of target tuples is specified via a callback routine that tells * whether any given heap tuple (identified by ItemPointer) is being deleted. * @@ -424,49 +429,99 @@ gistbulkdelete(PG_FUNCTION_ARGS) { void* callback_state = (void *) PG_GETARG_POINTER(2); IndexBulkDeleteResult *result = (IndexBulkDeleteResult*)palloc0(sizeof(IndexBulkDeleteResult)); GistBDItem *stack, *ptr; - MemoryContext opCtx = createTempGistContext(); + bool needLock; - stack = (GistBDItem*) palloc(sizeof(GistBDItem)); + stack = (GistBDItem*) palloc0(sizeof(GistBDItem)); stack->blkno = GIST_ROOT_BLKNO; - stack->next = NULL; needFullVacuum = false; while( stack ) { Buffer buffer = ReadBuffer(rel, stack->blkno); - Page page = (Page) BufferGetPage(buffer); - OffsetNumber i, maxoff = PageGetMaxOffsetNumber(page); + Page page; + OffsetNumber i, maxoff; IndexTuple idxtuple; ItemId iid; - OffsetNumber *todelete = NULL; - int ntodelete = 0; + + LockBuffer(buffer, GIST_SHARE); + page = (Page) BufferGetPage(buffer); if ( GistPageIsLeaf(page) ) { - ItemPointerData heapptr; + OffsetNumber *todelete = NULL; + int ntodelete = 0; + + LockBuffer(buffer, GIST_UNLOCK); + LockBuffer(buffer, GIST_EXCLUSIVE); + + page = (Page) BufferGetPage(buffer); + if ( stack->blkno==GIST_ROOT_BLKNO && !GistPageIsLeaf(page) ) { + /* the only root can become non-leaf during relock */ + LockBuffer(buffer, GIST_UNLOCK); + ReleaseBuffer(buffer); + /* one more check */ + continue; + } - todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*maxoff) ); + /* check for split proceeded after look at parent, + we should check it after relock */ + pushStackIfSplited(page, stack); + + maxoff = PageGetMaxOffsetNumber(page); + todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*(maxoff+1)) ); for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) { iid = PageGetItemId(page, i); idxtuple = (IndexTuple) PageGetItem(page, iid); - heapptr = idxtuple->t_tid; - if ( callback(&heapptr, callback_state) ) { - gistadjscans(rel, GISTOP_DEL, stack->blkno, i); + if ( callback(&(idxtuple->t_tid), callback_state) ) { PageIndexTupleDelete(page, i); - todelete[ ntodelete++ ] = i; - i--; maxoff--; + todelete[ ntodelete ] = i; + i--; maxoff--; ntodelete++; result->tuples_removed += 1; + Assert( maxoff == PageGetMaxOffsetNumber(page) ); } else result->num_index_tuples += 1; } + + if ( ntodelete ) { + GistMarkTuplesDeleted(page); + + if (!rel->rd_istemp ) { + XLogRecData *rdata; + XLogRecPtr recptr; + gistxlogEntryUpdate *xlinfo; + + rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete, + false, NULL, 0, NULL); + xlinfo = (gistxlogEntryUpdate*)rdata->data; + + START_CRIT_SECTION(); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + END_CRIT_SECTION(); + + pfree( xlinfo ); + pfree( rdata ); + } else + PageSetLSN(page, XLogRecPtrForTemp); + WriteNoReleaseBuffer( buffer ); + } + + pfree( todelete ); } else { + /* check for split proceeded after look at parent */ + pushStackIfSplited(page, stack); + + maxoff = PageGetMaxOffsetNumber(page); + for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) { iid = PageGetItemId(page, i); idxtuple = (IndexTuple) PageGetItem(page, iid); ptr = (GistBDItem*) palloc(sizeof(GistBDItem)); ptr->blkno = ItemPointerGetBlockNumber( &(idxtuple->t_tid) ); + ptr->parentlsn = PageGetLSN( page ); ptr->next = stack->next; stack->next = ptr; @@ -475,33 +530,9 @@ gistbulkdelete(PG_FUNCTION_ARGS) { } } - if ( ntodelete && todelete ) { - GistMarkTuplesDeleted(page); - - if (!rel->rd_istemp ) { - XLogRecData *rdata; - XLogRecPtr recptr; - MemoryContext oldCtx = MemoryContextSwitchTo(opCtx); - - rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete, - false, NULL, 0, NULL, NULL, 0); - MemoryContextSwitchTo(oldCtx); - - START_CRIT_SECTION(); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - END_CRIT_SECTION(); - - MemoryContextReset(opCtx); - } - - WriteBuffer( buffer ); - } else - ReleaseBuffer( buffer ); + LockBuffer( buffer, GIST_UNLOCK ); + ReleaseBuffer( buffer ); - if ( todelete ) - pfree( todelete ); ptr = stack->next; pfree( stack ); @@ -510,10 +541,13 @@ gistbulkdelete(PG_FUNCTION_ARGS) { vacuum_delay_point(); } - MemoryContextDelete( opCtx ); + needLock = !RELATION_IS_LOCAL(rel); + if (needLock) + LockRelationForExtension(rel, ExclusiveLock); result->num_pages = RelationGetNumberOfBlocks(rel); - + if (needLock) + UnlockRelationForExtension(rel, ExclusiveLock); PG_RETURN_POINTER( result ); } diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index d2f2697affa88b06bd4eb611aa636fa7b10d0fc3..de89789496063b1b51ea42bcddd25cb77b539192 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.3 2005/06/20 15:22:37 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.4 2005/06/27 12:45:22 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -27,7 +27,6 @@ typedef struct { gistxlogEntryUpdate *data; int len; IndexTuple *itup; - BlockNumber *path; OffsetNumber *todelete; } EntryUpdateRecord; @@ -39,7 +38,6 @@ typedef struct { typedef struct { gistxlogPageSplit *data; NewPage *page; - BlockNumber *path; } PageSplitRecord; /* track for incomplete inserts, idea was taken from nbtxlog.c */ @@ -49,9 +47,9 @@ typedef struct gistIncompleteInsert { ItemPointerData key; int lenblk; BlockNumber *blkno; - int pathlen; - BlockNumber *path; XLogRecPtr lsn; + BlockNumber *path; + int pathlen; } gistIncompleteInsert; @@ -69,7 +67,6 @@ static List *incomplete_inserts; static void pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, BlockNumber *blkno, int lenblk, - BlockNumber *path, int pathlen, PageSplitRecord *xlinfo /* to extract blkno info */ ) { MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) ); @@ -93,15 +90,6 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, } Assert( ninsert->lenblk>0 ); - if ( path && pathlen ) { - ninsert->pathlen = pathlen; - ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen ); - memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen); - } else { - ninsert->pathlen = 0; - ninsert->path = NULL; - } - incomplete_inserts = lappend(incomplete_inserts, ninsert); MemoryContextSwitchTo(oldCxt); } @@ -116,7 +104,6 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) { if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) { /* found */ - if ( insert->path ) pfree( insert->path ); pfree( insert->blkno ); incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); pfree( insert ); @@ -132,15 +119,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { decoded->data = (gistxlogEntryUpdate*)begin; - if ( decoded->data->pathlen ) { - addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); - decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); - } else - decoded->path = NULL; - if ( decoded->data->ntodelete ) { decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath); - addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); + addpath = MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); } else decoded->todelete = NULL; @@ -244,7 +225,6 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO ) pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, &(xlrec.data->blkno), 1, - xlrec.path, xlrec.data->pathlen, NULL); } } @@ -252,18 +232,12 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { static void decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; - int j,i=0, addpath = 0; + int j,i=0; decoded->data = (gistxlogPageSplit*)begin; decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage ); - if ( decoded->data->pathlen ) { - addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); - decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit )); - } else - decoded->path = NULL; - - ptr=begin+sizeof( gistxlogPageSplit ) + addpath; + ptr=begin+sizeof( gistxlogPageSplit ); for(i=0;idata->npage;i++) { Assert( ptr - begin < record->xl_len ); decoded->page[i].header = (gistxlogPage*)ptr; @@ -342,7 +316,6 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, NULL, 0, - xlrec.path, xlrec.data->pathlen, &xlrec); } } @@ -499,6 +472,36 @@ gist_form_invalid_tuple(BlockNumber blkno) { return tuple; } +static void +gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) { + int i; + GISTInsertStack *top; + + insert->pathlen = 0; + insert->path = NULL; + + for(i=0;insert->lenblk;i++) { + if ( (top=gistFindPath(index, insert->blkno[i], XLogReadBuffer)) != NULL ) { + GISTInsertStack *ptr=top; + while(ptr) { + insert->pathlen++; + ptr = ptr->parent; + } + + insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen ); + + i=0; + ptr = top; + while(ptr) { + insert->path[i] = ptr->blkno; + i++; + ptr = ptr->parent; + } + break; + } + } +} + static void gistContinueInsert(gistIncompleteInsert *insert) { IndexTuple *itup; @@ -523,6 +526,9 @@ gistContinueInsert(gistIncompleteInsert *insert) { for(i=0;ilenblk;i++) itup[i] = gist_form_invalid_tuple( insert->blkno[i] ); + /* construct path */ + gixtxlogFindPath( index, insert ); + if ( insert->pathlen==0 ) { /*it was split root, so we should only make new root*/ Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); @@ -662,8 +668,7 @@ gist_xlog_cleanup(void) { XLogRecData * formSplitRdata(RelFileNode node, BlockNumber blkno, - ItemPointer key, - BlockNumber *path, int pathlen, SplitedPageLayout *dist ) { + ItemPointer key, SplitedPageLayout *dist ) { XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit)); @@ -681,7 +686,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, xlrec->node = node; xlrec->origblkno = blkno; xlrec->npage = (uint16)npage; - xlrec->pathlen = (uint16)pathlen; if ( key ) xlrec->key = *key; else @@ -692,15 +696,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, rdata[0].len = sizeof( gistxlogPageSplit ); rdata[0].next = NULL; - if ( pathlen ) { - rdata[cur-1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)path; - rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); - rdata[cur].next = NULL; - cur++; - } - ptr=dist; while(ptr) { rdata[cur].buffer = InvalidBuffer; @@ -725,8 +720,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, XLogRecData * formUpdateRdata(RelFileNode node, BlockNumber blkno, OffsetNumber *todelete, int ntodelete, bool emptypage, - IndexTuple *itup, int ituplen, ItemPointer key, - BlockNumber *path, int pathlen) { + IndexTuple *itup, int ituplen, ItemPointer key ) { XLogRecData *rdata; gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate)); @@ -740,7 +734,6 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno, if ( emptypage ) { xlrec->isemptypage = true; xlrec->ntodelete = 0; - xlrec->pathlen = 0; rdata = (XLogRecData*)palloc( sizeof(XLogRecData) ); rdata->buffer = InvalidBuffer; @@ -752,24 +745,14 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno, xlrec->isemptypage = false; xlrec->ntodelete = ntodelete; - xlrec->pathlen = pathlen; - rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) ); + rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 2 + ituplen ) ); rdata->buffer = InvalidBuffer; rdata->data = (char*)xlrec; rdata->len = sizeof(gistxlogEntryUpdate); rdata->next = NULL; - if ( pathlen ) { - rdata[cur-1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)path; - rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); - rdata[cur].next = NULL; - cur++; - } - if ( ntodelete ) { rdata[cur-1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index a445efc7154cdfa0b7e0302ba7ae13c5b49a86c5..624b53d635cd0a7e9c0a2a67051bd041bba05601 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/index/indexam.c,v 1.83 2005/06/13 23:14:48 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/index/indexam.c,v 1.84 2005/06/27 12:45:22 teodor Exp $ * * INTERFACE ROUTINES * index_open - open an index relation by relation OID @@ -287,7 +287,6 @@ index_beginscan_internal(Relation indexRelation, FmgrInfo *procedure; RELATION_CHECKS; - GET_REL_PROCEDURE(ambeginscan); RelationIncrementReferenceCount(indexRelation); @@ -300,6 +299,13 @@ index_beginscan_internal(Relation indexRelation, */ LockRelation(indexRelation, AccessShareLock); + /* + * LockRelation can clean rd_aminfo structure, so fill procedure + * after LockRelation + */ + + GET_REL_PROCEDURE(ambeginscan); + /* * Tell the AM to open a scan. */ diff --git a/src/include/access/gist.h b/src/include/access/gist.h index bf9c1c712bb7ca7a274a57276a7099aa7ef81819..ee060e83c2bfb7f636338987dd04252fe17f2f20 100644 --- a/src/include/access/gist.h +++ b/src/include/access/gist.h @@ -9,7 +9,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.47 2005/06/20 10:29:36 teodor Exp $ + * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.48 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -19,6 +19,8 @@ #include "storage/bufpage.h" #include "storage/off.h" #include "utils/rel.h" +#include "access/xlog.h" +#include "access/xlogdefs.h" /* * amproc indexes for GiST indexes. @@ -39,9 +41,22 @@ #define F_DELETED (1 << 1) #define F_TUPLES_DELETED (1 << 2) +typedef XLogRecPtr GistNSN; + typedef struct GISTPageOpaqueData { - uint32 flags; + uint8 flags; + + /* number page to which current one is splitted in last split */ + uint8 nsplited; + + /* level of page, 0 - leaf */ + uint16 level; + BlockNumber rightlink; + + /* the only meaning - change this value if + page split. */ + GistNSN nsn; } GISTPageOpaqueData; typedef GISTPageOpaqueData *GISTPageOpaque; @@ -90,18 +105,20 @@ typedef struct GISTENTRY bool leafkey; } GISTENTRY; -#define GistPageIsLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_LEAF) +#define GistPageGetOpaque(page) ( (GISTPageOpaque) PageGetSpecialPointer(page) ) + +#define GistPageIsLeaf(page) ( GistPageGetOpaque(page)->flags & F_LEAF) #define GIST_LEAF(entry) (GistPageIsLeaf((entry)->page)) -#define GistPageSetLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_LEAF) -#define GistPageSetNonLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_LEAF) +#define GistPageSetLeaf(page) ( GistPageGetOpaque(page)->flags |= F_LEAF) +#define GistPageSetNonLeaf(page) ( GistPageGetOpaque(page)->flags &= ~F_LEAF) -#define GistPageIsDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_DELETED) -#define GistPageSetDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_DELETED) -#define GistPageSetNonDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_DELETED) +#define GistPageIsDeleted(page) ( GistPageGetOpaque(page)->flags & F_DELETED) +#define GistPageSetDeleted(page) ( GistPageGetOpaque(page)->flags |= F_DELETED) +#define GistPageSetNonDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_DELETED) -#define GistTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_TUPLES_DELETED) -#define GistMarkTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_TUPLES_DELETED) -#define GistClearTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_TUPLES_DELETED) +#define GistTuplesDeleted(page) ( GistPageGetOpaque(page)->flags & F_TUPLES_DELETED) +#define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED) +#define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED) /* * Vector of GISTENTRY structs; user-defined methods union and pick diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 54bf1957596e069b65dc9c864298e5c579e391a8..6ea4dccb688ac100cebe6b4d6ee61cc9b2a5268c 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.5 2005/06/20 15:22:38 teodor Exp $ + * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.6 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -20,7 +20,13 @@ #include "access/xlogdefs.h" #include "fmgr.h" +#define GIST_UNLOCK BUFFER_LOCK_UNLOCK +#define GIST_SHARE BUFFER_LOCK_SHARE +#define GIST_EXCLUSIVE BUFFER_LOCK_EXCLUSIVE + + /* + * XXX old comment!!! * When we descend a tree, we keep a stack of parent pointers. This * allows us to follow a chain of internal node points until we reach * a leaf node, and then back up the stack to re-examine the internal @@ -31,12 +37,15 @@ * the node's page that we stopped at (i.e. we followed the child * pointer located at the specified offset). */ -typedef struct GISTSTACK +typedef struct GISTSearchStack { - struct GISTSTACK *parent; - OffsetNumber offset; + struct GISTSearchStack *next; BlockNumber block; -} GISTSTACK; + /* to identify page changed */ + GistNSN lsn; + /* to recognize split occured */ + GistNSN parentlsn; +} GISTSearchStack; typedef struct GISTSTATE { @@ -57,8 +66,8 @@ typedef struct GISTSTATE */ typedef struct GISTScanOpaqueData { - GISTSTACK *stack; - GISTSTACK *markstk; + GISTSearchStack *stack; + GISTSearchStack *markstk; uint16 flags; GISTSTATE *giststate; MemoryContext tempCxt; @@ -68,6 +77,71 @@ typedef struct GISTScanOpaqueData typedef GISTScanOpaqueData *GISTScanOpaque; +/* XLog stuff */ +extern const XLogRecPtr XLogRecPtrForTemp; + +#define XLOG_GIST_ENTRY_UPDATE 0x00 +#define XLOG_GIST_ENTRY_DELETE 0x10 +#define XLOG_GIST_NEW_ROOT 0x20 + +typedef struct gistxlogEntryUpdate { + RelFileNode node; + BlockNumber blkno; + + uint16 ntodelete; + bool isemptypage; + + /* + * It used to identify completeness of insert. + * Sets to leaf itup + */ + ItemPointerData key; + + /* follow: + * 1. todelete OffsetNumbers + * 2. tuples to insert + */ +} gistxlogEntryUpdate; + +#define XLOG_GIST_PAGE_SPLIT 0x30 + +typedef struct gistxlogPageSplit { + RelFileNode node; + BlockNumber origblkno; /*splitted page*/ + uint16 npage; + + /* see comments on gistxlogEntryUpdate */ + ItemPointerData key; + + /* follow: + * 1. gistxlogPage and array of IndexTupleData per page + */ +} gistxlogPageSplit; + +#define XLOG_GIST_INSERT_COMPLETE 0x40 + +typedef struct gistxlogPage { + BlockNumber blkno; + int num; +} gistxlogPage; + +#define XLOG_GIST_CREATE_INDEX 0x50 + +typedef struct gistxlogInsertComplete { + RelFileNode node; + /* follows ItemPointerData key to clean */ +} gistxlogInsertComplete; + +/* SplitedPageLayout - gistSplit function result */ +typedef struct SplitedPageLayout { + gistxlogPage block; + IndexTupleData *list; + int lenlist; + Buffer buffer; /* to write after all proceed */ + + struct SplitedPageLayout *next; +} SplitedPageLayout; + /* * GISTInsertStack used for locking buffers and transfer arguments during * insertion @@ -78,16 +152,25 @@ typedef struct GISTInsertStack { BlockNumber blkno; Buffer buffer; Page page; + + /* log sequence number from page->lsn to + recognize page update and compare it with page's nsn + to recognize page split*/ + GistNSN lsn; /* child's offset */ OffsetNumber childoffnum; - /* pointer to parent */ + /* pointer to parent and child */ struct GISTInsertStack *parent; + struct GISTInsertStack *child; - bool todelete; + /* for gistFindPath */ + struct GISTInsertStack *next; } GISTInsertStack; +#define XLogRecPtrIsInvalid( r ) ( (r).xlogid == 0 && (r).xrecoff == 0 ) + typedef struct { Relation r; IndexTuple *itup; /* in/out, points to compressed entry */ @@ -97,10 +180,6 @@ typedef struct { /* pointer to heap tuple */ ItemPointerData key; - - /* path to stroe in XLog */ - BlockNumber *path; - int pathlen; } GISTInsertState; /* @@ -124,7 +203,7 @@ typedef struct { * constants tell us what sort of operation changed the index. */ #define GISTOP_DEL 0 -#define GISTOP_SPLIT 1 +/* #define GISTOP_SPLIT 1 */ #define ATTSIZE(datum, tupdesc, i, isnull) \ ( \ @@ -132,64 +211,6 @@ typedef struct { att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \ ) -/* XLog stuff */ -#define XLOG_GIST_ENTRY_UPDATE 0x00 -#define XLOG_GIST_ENTRY_DELETE 0x10 -#define XLOG_GIST_NEW_ROOT 0x20 - -typedef struct gistxlogEntryUpdate { - RelFileNode node; - BlockNumber blkno; - - uint16 ntodelete; - uint16 pathlen; - bool isemptypage; - - /* - * It used to identify completeness of insert. - * Sets to leaf itup - */ - ItemPointerData key; - - /* follow: - * 1. path to root (BlockNumber) - * 2. todelete OffsetNumbers - * 3. tuples to insert - */ -} gistxlogEntryUpdate; - -#define XLOG_GIST_PAGE_SPLIT 0x30 - -typedef struct gistxlogPageSplit { - RelFileNode node; - BlockNumber origblkno; /*splitted page*/ - uint16 pathlen; - uint16 npage; - - /* see comments on gistxlogEntryUpdate */ - ItemPointerData key; - - /* follow: - * 1. path to root (BlockNumber) - * 2. gistxlogPage and array of IndexTupleData per page - */ -} gistxlogPageSplit; - -typedef struct gistxlogPage { - BlockNumber blkno; - int num; -} gistxlogPage; - - -#define XLOG_GIST_INSERT_COMPLETE 0x40 - -typedef struct gistxlogInsertComplete { - RelFileNode node; - /* follows ItemPointerData key to clean */ -} gistxlogInsertComplete; - -#define XLOG_GIST_CREATE_INDEX 0x50 - /* * mark tuples on inner pages during recovery */ @@ -206,20 +227,14 @@ extern Datum gistinsert(PG_FUNCTION_ARGS); extern MemoryContext createTempGistContext(void); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void freeGISTstate(GISTSTATE *giststate); -extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key); extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate); +extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key); -typedef struct SplitedPageLayout { - gistxlogPage block; - IndexTupleData *list; - int lenlist; - Buffer buffer; /* to write after all proceed */ - - struct SplitedPageLayout *next; -} SplitedPageLayout; - -IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup, +extern IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup, int *len, SplitedPageLayout **dist, GISTSTATE *giststate); + +extern GISTInsertStack* gistFindPath( Relation r, BlockNumber child, + Buffer (*myReadBuffer)(bool, Relation, BlockNumber) ); /* gistxlog.c */ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(char *buf, uint8 xl_info, char *rec); @@ -229,12 +244,10 @@ extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); extern XLogRecData* formUpdateRdata(RelFileNode node, BlockNumber blkno, OffsetNumber *todelete, int ntodelete, bool emptypage, - IndexTuple *itup, int ituplen, ItemPointer key, - BlockNumber *path, int pathlen); + IndexTuple *itup, int ituplen, ItemPointer key); extern XLogRecData* formSplitRdata(RelFileNode node, BlockNumber blkno, - ItemPointer key, - BlockNumber *path, int pathlen, SplitedPageLayout *dist ); + ItemPointer key, SplitedPageLayout *dist); extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len); @@ -243,7 +256,7 @@ extern Datum gistgettuple(PG_FUNCTION_ARGS); extern Datum gistgetmulti(PG_FUNCTION_ARGS); /* gistutil.c */ -extern Buffer gistReadBuffer(Relation r, BlockNumber blkno); +extern Buffer gistNewBuffer(Relation r); extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, int len, OffsetNumber off); extern bool gistnospace(Page page, IndexTuple *itvec, int len); diff --git a/src/include/access/gistscan.h b/src/include/access/gistscan.h index 12a7a0ea73aeb4b0bf949a1d3db2c11ff436a6ee..8920f023fa6138f7495a2dbba292da3dea981622 100644 --- a/src/include/access/gistscan.h +++ b/src/include/access/gistscan.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/gistscan.h,v 1.26 2004/12/31 22:03:21 pgsql Exp $ + * $PostgreSQL: pgsql/src/include/access/gistscan.h,v 1.27 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -15,13 +15,14 @@ #define GISTSCAN_H #include "access/relscan.h" +#include "access/xlogdefs.h" extern Datum gistbeginscan(PG_FUNCTION_ARGS); extern Datum gistrescan(PG_FUNCTION_ARGS); extern Datum gistmarkpos(PG_FUNCTION_ARGS); extern Datum gistrestrpos(PG_FUNCTION_ARGS); extern Datum gistendscan(PG_FUNCTION_ARGS); -extern void gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum); +extern void gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn); extern void ReleaseResources_gist(void); #endif /* GISTSCAN_H */ diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index ab1b6c0a5dd7eed4dbb167250755544e93ce2934..1e5c7ce0fa0f6c2ab460e17a223a9cea8428e1df 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.281 2005/06/24 20:53:31 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.282 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 200506241 +#define CATALOG_VERSION_NO 200506271 #endif diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h index b5ecdd9c7389c81963122a4b51e68e6416eb5ab4..0f05c48c84482fec81c1ece9c5c48e050d1044b5 100644 --- a/src/include/catalog/pg_am.h +++ b/src/include/catalog/pg_am.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.36 2005/06/24 20:53:31 tgl Exp $ + * $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.37 2005/06/27 12:45:23 teodor Exp $ * * NOTES * the genbki.sh script reads this file and generates .bki @@ -112,7 +112,7 @@ DESCR("b-tree index access method"); DATA(insert OID = 405 ( hash 1 1 0 f f f f t hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete - hashcostestimate )); DESCR("hash index access method"); #define HASH_AM_OID 405 -DATA(insert OID = 783 ( gist 100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate )); +DATA(insert OID = 783 ( gist 100 7 0 f t f f t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate )); DESCR("GiST index access method"); #define GIST_AM_OID 783