提交 e8cab5fe 编写于 作者: T Teodor Sigaev

Concurrency for GiST

- full concurrency for insert/update/select/vacuum:
        - select and vacuum never locks more than one page simultaneously
        - select (gettuple) hasn't any lock across it's calls
        - insert never locks more than two page simultaneously:
                - during search of leaf to insert it locks only one page
                  simultaneously
                - while walk upward to the root it locked only parent (may be
                  non-direct parent) and child. One of them X-lock, another may
                  be S- or X-lock
- 'vacuum full' locks index
- improve gistgetmulti
- simplify XLOG records

Fix bug in index_beginscan_internal: LockRelation may clean
  rd_aminfo structure, so move GET_REL_PROCEDURE after LockRelation
上级 c3be085a
此差异已折叠。
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.50 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -21,10 +21,63 @@
static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n,
ScanDirection dir);
static bool gistnext(IndexScanDesc scan, ScanDirection dir);
static int gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples);
static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
OffsetNumber offset);
static void
killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) {
Buffer buffer = so->curbuf;
for(;;) {
Page p;
BlockNumber blkno;
OffsetNumber offset, maxoff;
LockBuffer( buffer, GIST_SHARE );
p = (Page)BufferGetPage( buffer );
if ( buffer == so->curbuf && XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
/* page unchanged, so all is simple */
offset = ItemPointerGetOffsetNumber(iptr);
PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
SetBufferCommitInfoNeedsSave(buffer);
LockBuffer( buffer, GIST_UNLOCK );
break;
}
maxoff = PageGetMaxOffsetNumber( p );
for(offset = FirstOffsetNumber; offset<= maxoff; offset = OffsetNumberNext(offset)) {
IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
if ( ItemPointerEquals( &(ituple->t_tid), iptr ) ) {
/* found */
PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
SetBufferCommitInfoNeedsSave(buffer);
LockBuffer( buffer, GIST_UNLOCK );
if ( buffer != so->curbuf )
ReleaseBuffer( buffer );
return;
}
}
/* follow right link */
/*
* ??? is it good? if tuple dropped by concurrent vacuum,
* we will read all leaf pages...
*/
blkno = GistPageGetOpaque(p)->rightlink;
LockBuffer( buffer, GIST_UNLOCK );
if ( buffer != so->curbuf )
ReleaseBuffer( buffer );
if ( blkno==InvalidBlockNumber )
/* can't found, dropped by somebody else */
return;
buffer = ReadBuffer( r, blkno );
}
}
/*
* gistgettuple() -- Get the next tuple in the scan
......@@ -34,48 +87,27 @@ gistgettuple(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
Page page;
OffsetNumber offnum;
GISTScanOpaque so;
ItemPointerData tid;
bool res;
so = (GISTScanOpaque) scan->opaque;
/*
* If we have produced an index tuple in the past and the executor
* has informed us we need to mark it as "killed", do so now.
*
* XXX: right now there is no concurrent access. In the
* future, we should (a) get a read lock on the page (b) check
* that the location of the previously-fetched tuple hasn't
* changed due to concurrent insertions.
*/
if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->curbuf);
PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
SetBufferCommitInfoNeedsSave(so->curbuf);
}
killtuple(scan->indexRelation, so, &(scan->currentItemData));
/*
* Get the next tuple that matches the search key. If asked to
* skip killed tuples, continue looping until we find a non-killed
* tuple that matches the search key.
*/
for (;;)
{
bool res = gistnext(scan, dir);
if (res == true && scan->ignore_killed_tuples)
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->curbuf);
if (ItemIdDeleted(PageGetItemId(page, offnum)))
continue;
}
res = ( gistnext(scan, dir, &tid, 1, scan->ignore_killed_tuples) ) ? true : false;
PG_RETURN_BOOL(res);
}
}
Datum
......@@ -85,36 +117,28 @@ gistgetmulti(PG_FUNCTION_ARGS)
ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1);
int32 max_tids = PG_GETARG_INT32(2);
int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3);
bool res = true;
int32 ntids = 0;
/* XXX generic implementation: loop around guts of gistgettuple */
while (ntids < max_tids)
{
res = gistnext(scan, ForwardScanDirection);
if (!res)
break;
tids[ntids] = scan->xs_ctup.t_self;
ntids++;
}
*returned_tids = gistnext(scan, ForwardScanDirection, tids, max_tids, false);
*returned_tids = ntids;
PG_RETURN_BOOL(res);
PG_RETURN_BOOL(*returned_tids == max_tids);
}
/*
* Fetch a tuple that matchs the search key; this can be invoked
* Fetch a tuples that matchs the search key; this can be invoked
* either to fetch the first such tuple or subsequent matching
* tuples. Returns true iff a matching tuple was found.
*/
static bool
gistnext(IndexScanDesc scan, ScanDirection dir)
static int
gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples)
{
Page p;
OffsetNumber n;
GISTScanOpaque so;
GISTSTACK *stk;
GISTSearchStack *stk;
IndexTuple it;
GISTPageOpaque opaque;
bool resetoffset=false;
int ntids=0;
so = (GISTScanOpaque) scan->opaque;
......@@ -122,12 +146,66 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
{
/* Being asked to fetch the first entry, so start at the root */
Assert(so->curbuf == InvalidBuffer);
Assert(so->stack == NULL);
so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
stk = so->stack = (GISTSearchStack*) palloc0( sizeof(GISTSearchStack) );
stk->next = NULL;
stk->block = GIST_ROOT_BLKNO;
} else if ( so->curbuf == InvalidBuffer ) {
return 0;
}
for(;;) {
/* First of all, we need lock buffer */
Assert( so->curbuf != InvalidBuffer );
LockBuffer( so->curbuf, GIST_SHARE );
p = BufferGetPage(so->curbuf);
opaque = GistPageGetOpaque( p );
resetoffset = false;
if ( XLogRecPtrIsInvalid( so->stack->lsn ) || !XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
/* page changed from last visit or visit first time , reset offset */
so->stack->lsn = PageGetLSN(p);
resetoffset = true;
/* check page split, occured from last visit or visit to parent */
if ( !XLogRecPtrIsInvalid( so->stack->parentlsn ) &&
XLByteLT( so->stack->parentlsn, opaque->nsn ) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
(so->stack->next==NULL || so->stack->next->block != opaque->rightlink) /* check if already added */) {
/* detect page split, follow right link to add pages */
stk = (GISTSearchStack*) palloc( sizeof(GISTSearchStack) );
stk->next = so->stack->next;
stk->block = opaque->rightlink;
stk->parentlsn = so->stack->parentlsn;
memset( &(stk->lsn), 0, sizeof(GistNSN) );
so->stack->next = stk;
}
}
if (ItemPointerIsValid(&scan->currentItemData) == false)
/* if page is empty, then just skip it */
if ( PageIsEmpty(p) ) {
LockBuffer( so->curbuf, GIST_UNLOCK );
stk = so->stack->next;
pfree( so->stack );
so->stack = stk;
if (so->stack == NULL) {
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
return ntids;
}
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
stk->block);
continue;
}
if (!GistPageIsLeaf(p) || resetoffset || ItemPointerIsValid(&scan->currentItemData) == false)
{
if (ScanDirectionIsBackward(dir))
n = PageGetMaxOffsetNumber(p);
......@@ -144,7 +222,9 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
n = OffsetNumberNext(n);
}
for (;;)
/* wonderfull, we can look at page */
for(;;)
{
n = gistfindnext(scan, n, dir);
......@@ -155,28 +235,24 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
* page, so pop the top stack entry and use it to continue
* the search.
*/
LockBuffer( so->curbuf, GIST_UNLOCK );
stk = so->stack->next;
pfree( so->stack );
so->stack = stk;
/* If we're out of stack entries, we're done */
if (so->stack == NULL)
{
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
return false;
return ntids;
}
stk = so->stack;
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
stk->block);
p = BufferGetPage(so->curbuf);
if (ScanDirectionIsBackward(dir))
n = OffsetNumberPrev(stk->offset);
else
n = OffsetNumberNext(stk->offset);
so->stack = stk->parent;
pfree(stk);
continue;
/* XXX go up */
break;
}
if (GistPageIsLeaf(p))
......@@ -186,43 +262,48 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
* return success. Note that we keep "curbuf" pinned so
* that we can efficiently resume the index scan later.
*/
ItemPointerSet(&(scan->currentItemData),
BufferGetBlockNumber(so->curbuf), n);
if ( ! ( ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n)) ) ) {
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
scan->xs_ctup.t_self = it->t_tid;
return true;
tids[ntids] = scan->xs_ctup.t_self = it->t_tid;
ntids++;
if ( ntids == maxtids ) {
LockBuffer( so->curbuf, GIST_UNLOCK );
return ntids;
}
}
}
else
{
/*
* We've found an entry in an internal node whose key is
* consistent with the search key, so continue the search
* in the pointed-to child node (i.e. we search depth
* first). Push the current node onto the stack so we
* resume searching from this node later.
* consistent with the search key, so push it to stack
*/
BlockNumber child_block;
stk = (GISTSTACK *) palloc(sizeof(GISTSTACK));
stk->offset = n;
stk->block = BufferGetBlockNumber(so->curbuf);
stk->parent = so->stack;
so->stack = stk;
stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
child_block = ItemPointerGetBlockNumber(&(it->t_tid));
stk->block = ItemPointerGetBlockNumber(&(it->t_tid));
memset( &(stk->lsn), 0, sizeof(GistNSN) );
stk->parentlsn = so->stack->lsn;
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
child_block);
p = BufferGetPage(so->curbuf);
stk->next = so->stack->next;
so->stack->next = stk;
}
if (ScanDirectionIsBackward(dir))
n = PageGetMaxOffsetNumber(p);
n = OffsetNumberPrev(n);
else
n = FirstOffsetNumber;
n = OffsetNumberNext(n);
}
}
return ntids;
}
/*
......@@ -313,6 +394,7 @@ gistindex_keytest(IndexTuple tuple,
* Return the offset of the first index entry that is consistent with
* the search key after offset 'n' in the current page. If there are
* no more consistent entries, return InvalidOffsetNumber.
* Page should be locked....
*/
static OffsetNumber
gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.58 2005/05/17 03:34:18 neilc Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.59 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -24,11 +24,10 @@
static void gistregscan(IndexScanDesc scan);
static void gistdropscan(IndexScanDesc scan);
static void gistadjone(IndexScanDesc scan, int op, BlockNumber blkno,
OffsetNumber offnum);
static void adjuststack(GISTSTACK *stk, BlockNumber blkno);
static void adjustiptr(IndexScanDesc scan, ItemPointer iptr,
int op, BlockNumber blkno, OffsetNumber offnum);
static void gistfreestack(GISTSTACK *s);
OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn);
static void adjustiptr(IndexScanDesc scan, ItemPointer iptr, GISTSearchStack *stk,
int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn);
static void gistfreestack(GISTSearchStack *s);
/*
* Whenever we start a GiST scan in a backend, we register it in
......@@ -139,7 +138,7 @@ gistmarkpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so;
GISTSTACK *o,
GISTSearchStack *o,
*n,
*tmp;
......@@ -156,12 +155,13 @@ gistmarkpos(PG_FUNCTION_ARGS)
/* copy the parent stack from the current item data */
while (n != NULL)
{
tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
tmp->offset = n->offset;
tmp = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
tmp->lsn = n->lsn;
tmp->parentlsn = n->parentlsn;
tmp->block = n->block;
tmp->parent = o;
tmp->next = o;
o = tmp;
n = n->parent;
n = n->next;
}
gistfreestack(so->markstk);
......@@ -187,7 +187,7 @@ gistrestrpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so;
GISTSTACK *o,
GISTSearchStack *o,
*n,
*tmp;
......@@ -204,12 +204,13 @@ gistrestrpos(PG_FUNCTION_ARGS)
/* copy the parent stack from the current item data */
while (n != NULL)
{
tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
tmp->offset = n->offset;
tmp = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
tmp->lsn = n->lsn;
tmp->parentlsn = n->parentlsn;
tmp->block = n->block;
tmp->parent = o;
tmp->next = o;
o = tmp;
n = n->parent;
n = n->next;
}
gistfreestack(so->stack);
......@@ -253,6 +254,7 @@ gistendscan(PG_FUNCTION_ARGS)
pfree(scan->opaque);
}
gistdropscan(scan);
PG_RETURN_VOID();
......@@ -331,16 +333,19 @@ ReleaseResources_gist(void)
}
void
gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum)
gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn)
{
GISTScanList l;
Oid relid;
if ( XLogRecPtrIsInvalid(newlsn) || XLogRecPtrIsInvalid(oldlsn) )
return;
relid = RelationGetRelid(rel);
for (l = GISTScans; l != NULL; l = l->gsl_next)
{
if (l->gsl_scan->indexRelation->rd_id == relid)
gistadjone(l->gsl_scan, op, blkno, offnum);
gistadjone(l->gsl_scan, op, blkno, offnum, newlsn, oldlsn);
}
}
......@@ -358,20 +363,12 @@ static void
gistadjone(IndexScanDesc scan,
int op,
BlockNumber blkno,
OffsetNumber offnum)
OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn)
{
GISTScanOpaque so;
adjustiptr(scan, &(scan->currentItemData), op, blkno, offnum);
adjustiptr(scan, &(scan->currentMarkData), op, blkno, offnum);
GISTScanOpaque so = (GISTScanOpaque) scan->opaque ;
so = (GISTScanOpaque) scan->opaque;
if (op == GISTOP_SPLIT)
{
adjuststack(so->stack, blkno);
adjuststack(so->markstk, blkno);
}
adjustiptr(scan, &(scan->currentItemData), so->stack, op, blkno, offnum, newlsn, oldlsn);
adjustiptr(scan, &(scan->currentMarkData), so->markstk, op, blkno, offnum, newlsn, oldlsn);
}
/*
......@@ -383,10 +380,10 @@ gistadjone(IndexScanDesc scan,
*/
static void
adjustiptr(IndexScanDesc scan,
ItemPointer iptr,
ItemPointer iptr, GISTSearchStack *stk,
int op,
BlockNumber blkno,
OffsetNumber offnum)
OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn)
{
OffsetNumber curoff;
GISTScanOpaque so;
......@@ -402,7 +399,7 @@ adjustiptr(IndexScanDesc scan,
{
case GISTOP_DEL:
/* back up one if we need to */
if (curoff >= offnum)
if (curoff >= offnum && XLByteEQ(stk->lsn, oldlsn) ) /* the same vesrion of page */
{
if (curoff > FirstOffsetNumber)
{
......@@ -421,18 +418,9 @@ adjustiptr(IndexScanDesc scan,
else
so->flags |= GS_MRKBEFORE;
}
stk->lsn = newlsn;
}
break;
case GISTOP_SPLIT:
/* back to start of page on split */
ItemPointerSet(iptr, blkno, FirstOffsetNumber);
if (iptr == &(scan->currentItemData))
so->flags &= ~GS_CURBEFORE;
else
so->flags &= ~GS_MRKBEFORE;
break;
default:
elog(ERROR, "Bad operation in GiST scan adjust: %d", op);
}
......@@ -440,37 +428,12 @@ adjustiptr(IndexScanDesc scan,
}
}
/*
* adjuststack() -- adjust the supplied stack for a split on a page in
* the index we're scanning.
*
* If a page on our parent stack has split, we need to back up to the
* beginning of the page and rescan it. The reason for this is that
* the split algorithm for GiSTs doesn't order tuples in any useful
* way on a single page. This means on that a split, we may wind up
* looking at some heap tuples more than once. This is handled in the
* access method update code for heaps; if we've modified the tuple we
* are looking at already in this transaction, we ignore the update
* request.
*/
static void
adjuststack(GISTSTACK *stk, BlockNumber blkno)
{
while (stk != NULL)
{
if (stk->block == blkno)
stk->offset = FirstOffsetNumber;
stk = stk->parent;
}
}
static void
gistfreestack(GISTSTACK *s)
gistfreestack(GISTSearchStack *s)
{
while (s != NULL)
{
GISTSTACK *p = s->parent;
GISTSearchStack *p = s->next;
pfree(s);
s = p;
}
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.3 2005/06/27 12:45:22 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
......@@ -803,8 +803,12 @@ GISTInitBuffer(Buffer b, uint32 f)
page = BufferGetPage(b);
PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
opaque = GistPageGetOpaque(page);
opaque->flags = f;
opaque->nsplited = 0;
opaque->level = 0;
opaque->rightlink = InvalidBlockNumber;
memset( &(opaque->nsn), 0, sizeof(GistNSN) );
}
void
......@@ -856,30 +860,38 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
}
Buffer
gistReadBuffer(Relation r, BlockNumber blkno) {
gistNewBuffer(Relation r) {
Buffer buffer = InvalidBuffer;
if ( blkno != P_NEW ) {
buffer = ReadBuffer(r, blkno);
} else {
Page page;
bool needLock;
while(true) {
blkno = GetFreeIndexPage(&r->rd_node);
BlockNumber blkno = GetFreeIndexPage(&r->rd_node);
if (blkno == InvalidBlockNumber)
break;
buffer = ReadBuffer(r, blkno);
page = BufferGetPage(buffer);
if ( ConditionalLockBuffer(buffer) ) {
Page page = BufferGetPage(buffer);
if ( GistPageIsDeleted( page ) ) {
GistPageSetNonDeleted( page );
return buffer;
} else
LockBuffer(buffer, GIST_UNLOCK);
}
ReleaseBuffer( buffer );
}
needLock = !RELATION_IS_LOCAL(r);
if (needLock)
LockRelationForExtension(r, ExclusiveLock);
buffer = ReadBuffer(r, P_NEW);
}
LockBuffer(buffer, GIST_EXCLUSIVE);
if (needLock)
UnlockRelationForExtension(r, ExclusiveLock);
return buffer;
}
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.2 2005/06/20 15:22:37 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.3 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -34,43 +34,14 @@ typedef struct {
Relation index;
MemoryContext opCtx;
IndexBulkDeleteResult *result;
/* path to root */
BlockNumber *path;
int pathlen;
int curpathlen;
} GistVacuum;
static void
shiftPath(GistVacuum *gv, BlockNumber blkno) {
if ( gv->pathlen == 0 ) {
gv->pathlen = 8;
gv->path = (BlockNumber*) palloc( MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
} else if ( gv->pathlen == gv->curpathlen ) {
gv->pathlen *= 2;
gv->path = (BlockNumber*) repalloc( gv->path, MAXALIGN(sizeof(BlockNumber)*gv->pathlen) );
}
if ( gv->curpathlen )
memmove( gv->path+1, gv->path, sizeof(BlockNumber)*gv->curpathlen );
gv->curpathlen++;
gv->path[0] = blkno;
}
static void
unshiftPath(GistVacuum *gv) {
gv->curpathlen--;
if ( gv->curpathlen )
memmove( gv->path, gv->path+1, sizeof(BlockNumber)*gv->curpathlen );
}
typedef struct {
IndexTuple *itup;
int ituplen;
bool emptypage;
} ArrayTuple;
static ArrayTuple
gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
ArrayTuple res = {NULL, 0, false};
......@@ -100,7 +71,6 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
completed = (ItemPointerData*)palloc( sizeof(ItemPointerData)*lencompleted );
addon=(IndexTuple*)palloc(sizeof(IndexTuple)*lenaddon);
shiftPath(gv, blkno);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
ArrayTuple chldtuple;
bool needchildunion;
......@@ -115,8 +85,6 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
chldtuple = gistVacuumUpdate( gv, ItemPointerGetBlockNumber(&(idxtuple->t_tid)),
needchildunion );
if ( chldtuple.ituplen || chldtuple.emptypage ) {
/* adjust any scans that will be affected by this deletion */
gistadjscans(gv->index, GISTOP_DEL, blkno, i);
PageIndexTupleDelete(page, i);
todelete[ ntodelete++ ] = i;
i--; maxoff--;
......@@ -180,10 +148,8 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
oldCtx = MemoryContextSwitchTo(gv->opCtx);
/* path is need to recovery because there is new pages, in a case of
crash it's needed to add inner tuple pointers on parent page */
rdata = formSplitRdata(gv->index->rd_node, blkno,
&key, gv->path, gv->curpathlen, dist);
&key, dist);
MemoryContextSwitchTo(oldCtx);
......@@ -198,11 +164,18 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
}
END_CRIT_SECTION();
} else {
ptr = dist;
while(ptr) {
PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
ptr=ptr->next;
}
}
ptr = dist;
while(ptr) {
if ( BufferGetBlockNumber(ptr->buffer) != blkno )
LockBuffer( ptr->buffer, GIST_UNLOCK );
WriteBuffer(ptr->buffer);
ptr=ptr->next;
}
......@@ -213,8 +186,10 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
oldCtx = MemoryContextSwitchTo(gv->opCtx);
gistnewroot(gv->index, res.itup, res.ituplen, &key);
gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
MemoryContextSwitchTo(oldCtx);
WriteNoReleaseBuffer(buffer);
}
needwrite=false;
......@@ -223,16 +198,15 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
needunion = false; /* gistSplit already forms unions */
} else {
/* enough free space */
OffsetNumber off = (PageIsEmpty(page)) ?
FirstOffsetNumber
:
OffsetNumberNext(PageGetMaxOffsetNumber(page));
/* enough free space */
gistfillbuffer(gv->index, page, addon, curlenaddon, off);
}
}
unshiftPath(gv);
}
if ( needunion ) {
......@@ -272,22 +246,22 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
if ( !gv->index->rd_istemp ) {
XLogRecData *rdata;
XLogRecPtr recptr;
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
char *xlinfo;
/* In a vacuum, it's not need to push path, because
there is no new inserted keys */
rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
res.emptypage, addon, curlenaddon, NULL, NULL, 0);
MemoryContextSwitchTo(oldCtx);
res.emptypage, addon, curlenaddon, NULL );
xlinfo = rdata->data;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
MemoryContextReset(gv->opCtx);
}
pfree( xlinfo );
pfree( rdata );
} else
PageSetLSN(page, XLogRecPtrForTemp);
WriteBuffer( buffer );
} else
ReleaseBuffer( buffer );
......@@ -318,22 +292,20 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
BlockNumber npages, blkno;
BlockNumber nFreePages, *freePages, maxFreePages;
BlockNumber lastBlock = GIST_ROOT_BLKNO, lastFilledBlock = GIST_ROOT_BLKNO;
/* LockRelation(rel, AccessExclusiveLock); */
bool needLock;
/* gistVacuumUpdate may cause hard work */
if ( info->vacuum_full ) {
GistVacuum gv;
ArrayTuple res;
LockRelation(rel, AccessExclusiveLock);
gv.index = rel;
initGISTstate(&(gv.giststate), rel);
gv.opCtx = createTempGistContext();
gv.result = stats;
gv.path=NULL;
gv.pathlen = gv.curpathlen = 0;
/* walk through the entire index for update tuples */
res = gistVacuumUpdate( &gv, GIST_ROOT_BLKNO, false );
/* cleanup */
......@@ -343,8 +315,6 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
pfree( res.itup[i] );
pfree( res.itup );
}
if ( gv.path )
pfree( gv.path );
freeGISTstate(&(gv.giststate));
MemoryContextDelete(gv.opCtx);
} else if (needFullVacuum) {
......@@ -354,16 +324,29 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
needFullVacuum = false;
needLock = !RELATION_IS_LOCAL(rel);
if ( info->vacuum_full )
needLock = false; /* relation locked with AccessExclusiveLock */
/* try to find deleted pages */
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
npages = RelationGetNumberOfBlocks(rel);
maxFreePages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
maxFreePages = npages;
if ( maxFreePages > MaxFSMPages )
maxFreePages = MaxFSMPages;
nFreePages = 0;
freePages = (BlockNumber*) palloc (sizeof(BlockNumber) * maxFreePages);
for(blkno=GIST_ROOT_BLKNO+1;blkno<npages;blkno++) {
Buffer buffer = ReadBuffer(rel, blkno);
Page page=(Page)BufferGetPage(buffer);
Page page;
LockBuffer( buffer, GIST_SHARE );
page=(Page)BufferGetPage(buffer);
if ( GistPageIsDeleted(page) ) {
if (nFreePages < maxFreePages) {
......@@ -372,12 +355,12 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
}
} else
lastFilledBlock = blkno;
LockBuffer( buffer, GIST_UNLOCK );
ReleaseBuffer(buffer);
}
lastBlock = npages-1;
if ( nFreePages > 0 ) {
if ( info->vacuum_full ) { /* try to truncate index */
if ( info->vacuum_full && nFreePages>0 ) { /* try to truncate index */
int i;
for(i=0;i<nFreePages;i++)
if ( freePages[i] >= lastFilledBlock ) {
......@@ -390,28 +373,50 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) {
stats->pages_removed = lastBlock - lastFilledBlock;
}
if ( nFreePages > 0 )
RecordIndexFreeSpace( &rel->rd_node, nFreePages, freePages );
}
pfree( freePages );
/* return statistics */
stats->pages_free = nFreePages;
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
stats->num_pages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
/* UnlockRelation(rel, AccessExclusiveLock); */
if (info->vacuum_full)
UnlockRelation(rel, AccessExclusiveLock);
PG_RETURN_POINTER(stats);
}
typedef struct GistBDItem {
GistNSN parentlsn;
BlockNumber blkno;
struct GistBDItem *next;
} GistBDItem;
static void
pushStackIfSplited(Page page, GistBDItem *stack) {
GISTPageOpaque opaque = GistPageGetOpaque(page);
if ( stack->blkno!=GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid( stack->parentlsn ) &&
XLByteLT( stack->parentlsn, opaque->nsn) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ ) {
/* split page detected, install right link to the stack */
GistBDItem *ptr = (GistBDItem*) palloc(sizeof(GistBDItem));
ptr->blkno = opaque->rightlink;
ptr->parentlsn = stack->parentlsn;
ptr->next = stack->next;
stack->next = ptr;
}
}
/*
* Bulk deletion of all index entries pointing to a set of heap tuples and
* update invalid tuples after crash recovery.
* check invalid tuples after crash recovery.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
......@@ -424,68 +429,71 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
void* callback_state = (void *) PG_GETARG_POINTER(2);
IndexBulkDeleteResult *result = (IndexBulkDeleteResult*)palloc0(sizeof(IndexBulkDeleteResult));
GistBDItem *stack, *ptr;
MemoryContext opCtx = createTempGistContext();
bool needLock;
stack = (GistBDItem*) palloc(sizeof(GistBDItem));
stack = (GistBDItem*) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO;
stack->next = NULL;
needFullVacuum = false;
while( stack ) {
Buffer buffer = ReadBuffer(rel, stack->blkno);
Page page = (Page) BufferGetPage(buffer);
OffsetNumber i, maxoff = PageGetMaxOffsetNumber(page);
Page page;
OffsetNumber i, maxoff;
IndexTuple idxtuple;
ItemId iid;
LockBuffer(buffer, GIST_SHARE);
page = (Page) BufferGetPage(buffer);
if ( GistPageIsLeaf(page) ) {
OffsetNumber *todelete = NULL;
int ntodelete = 0;
if ( GistPageIsLeaf(page) ) {
ItemPointerData heapptr;
LockBuffer(buffer, GIST_UNLOCK);
LockBuffer(buffer, GIST_EXCLUSIVE);
todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*maxoff) );
page = (Page) BufferGetPage(buffer);
if ( stack->blkno==GIST_ROOT_BLKNO && !GistPageIsLeaf(page) ) {
/* the only root can become non-leaf during relock */
LockBuffer(buffer, GIST_UNLOCK);
ReleaseBuffer(buffer);
/* one more check */
continue;
}
/* check for split proceeded after look at parent,
we should check it after relock */
pushStackIfSplited(page, stack);
maxoff = PageGetMaxOffsetNumber(page);
todelete = (OffsetNumber*)palloc( MAXALIGN(sizeof(OffsetNumber)*(maxoff+1)) );
for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
heapptr = idxtuple->t_tid;
if ( callback(&heapptr, callback_state) ) {
gistadjscans(rel, GISTOP_DEL, stack->blkno, i);
if ( callback(&(idxtuple->t_tid), callback_state) ) {
PageIndexTupleDelete(page, i);
todelete[ ntodelete++ ] = i;
i--; maxoff--;
todelete[ ntodelete ] = i;
i--; maxoff--; ntodelete++;
result->tuples_removed += 1;
Assert( maxoff == PageGetMaxOffsetNumber(page) );
} else
result->num_index_tuples += 1;
}
} else {
for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
ptr = (GistBDItem*) palloc(sizeof(GistBDItem));
ptr->blkno = ItemPointerGetBlockNumber( &(idxtuple->t_tid) );
ptr->next = stack->next;
stack->next = ptr;
if ( GistTupleIsInvalid(idxtuple) )
needFullVacuum = true;
}
}
if ( ntodelete && todelete ) {
if ( ntodelete ) {
GistMarkTuplesDeleted(page);
if (!rel->rd_istemp ) {
XLogRecData *rdata;
XLogRecPtr recptr;
MemoryContext oldCtx = MemoryContextSwitchTo(opCtx);
gistxlogEntryUpdate *xlinfo;
rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
false, NULL, 0, NULL, NULL, 0);
MemoryContextSwitchTo(oldCtx);
false, NULL, 0, NULL);
xlinfo = (gistxlogEntryUpdate*)rdata->data;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
......@@ -493,15 +501,38 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
MemoryContextReset(opCtx);
pfree( xlinfo );
pfree( rdata );
} else
PageSetLSN(page, XLogRecPtrForTemp);
WriteNoReleaseBuffer( buffer );
}
WriteBuffer( buffer );
} else
pfree( todelete );
} else {
/* check for split proceeded after look at parent */
pushStackIfSplited(page, stack);
maxoff = PageGetMaxOffsetNumber(page);
for(i=FirstOffsetNumber;i<=maxoff;i=OffsetNumberNext(i)) {
iid = PageGetItemId(page, i);
idxtuple = (IndexTuple) PageGetItem(page, iid);
ptr = (GistBDItem*) palloc(sizeof(GistBDItem));
ptr->blkno = ItemPointerGetBlockNumber( &(idxtuple->t_tid) );
ptr->parentlsn = PageGetLSN( page );
ptr->next = stack->next;
stack->next = ptr;
if ( GistTupleIsInvalid(idxtuple) )
needFullVacuum = true;
}
}
LockBuffer( buffer, GIST_UNLOCK );
ReleaseBuffer( buffer );
if ( todelete )
pfree( todelete );
ptr = stack->next;
pfree( stack );
......@@ -510,10 +541,13 @@ gistbulkdelete(PG_FUNCTION_ARGS) {
vacuum_delay_point();
}
MemoryContextDelete( opCtx );
needLock = !RELATION_IS_LOCAL(rel);
if (needLock)
LockRelationForExtension(rel, ExclusiveLock);
result->num_pages = RelationGetNumberOfBlocks(rel);
if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock);
PG_RETURN_POINTER( result );
}
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.3 2005/06/20 15:22:37 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.4 2005/06/27 12:45:22 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
......@@ -27,7 +27,6 @@ typedef struct {
gistxlogEntryUpdate *data;
int len;
IndexTuple *itup;
BlockNumber *path;
OffsetNumber *todelete;
} EntryUpdateRecord;
......@@ -39,7 +38,6 @@ typedef struct {
typedef struct {
gistxlogPageSplit *data;
NewPage *page;
BlockNumber *path;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
......@@ -49,9 +47,9 @@ typedef struct gistIncompleteInsert {
ItemPointerData key;
int lenblk;
BlockNumber *blkno;
int pathlen;
BlockNumber *path;
XLogRecPtr lsn;
BlockNumber *path;
int pathlen;
} gistIncompleteInsert;
......@@ -69,7 +67,6 @@ static List *incomplete_inserts;
static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
BlockNumber *path, int pathlen,
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
......@@ -93,15 +90,6 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
}
Assert( ninsert->lenblk>0 );
if ( path && pathlen ) {
ninsert->pathlen = pathlen;
ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
} else {
ninsert->pathlen = 0;
ninsert->path = NULL;
}
incomplete_inserts = lappend(incomplete_inserts, ninsert);
MemoryContextSwitchTo(oldCxt);
}
......@@ -116,7 +104,6 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
/* found */
if ( insert->path ) pfree( insert->path );
pfree( insert->blkno );
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
pfree( insert );
......@@ -132,15 +119,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
decoded->data = (gistxlogEntryUpdate*)begin;
if ( decoded->data->pathlen ) {
addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
} else
decoded->path = NULL;
if ( decoded->data->ntodelete ) {
decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
addpath = MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
} else
decoded->todelete = NULL;
......@@ -244,7 +225,6 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
&(xlrec.data->blkno), 1,
xlrec.path, xlrec.data->pathlen,
NULL);
}
}
......@@ -252,18 +232,12 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
int j,i=0, addpath = 0;
int j,i=0;
decoded->data = (gistxlogPageSplit*)begin;
decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
if ( decoded->data->pathlen ) {
addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit ));
} else
decoded->path = NULL;
ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
ptr=begin+sizeof( gistxlogPageSplit );
for(i=0;i<decoded->data->npage;i++) {
Assert( ptr - begin < record->xl_len );
decoded->page[i].header = (gistxlogPage*)ptr;
......@@ -342,7 +316,6 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
xlrec.path, xlrec.data->pathlen,
&xlrec);
}
}
......@@ -499,6 +472,36 @@ gist_form_invalid_tuple(BlockNumber blkno) {
return tuple;
}
static void
gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
int i;
GISTInsertStack *top;
insert->pathlen = 0;
insert->path = NULL;
for(i=0;insert->lenblk;i++) {
if ( (top=gistFindPath(index, insert->blkno[i], XLogReadBuffer)) != NULL ) {
GISTInsertStack *ptr=top;
while(ptr) {
insert->pathlen++;
ptr = ptr->parent;
}
insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
i=0;
ptr = top;
while(ptr) {
insert->path[i] = ptr->blkno;
i++;
ptr = ptr->parent;
}
break;
}
}
}
static void
gistContinueInsert(gistIncompleteInsert *insert) {
IndexTuple *itup;
......@@ -523,6 +526,9 @@ gistContinueInsert(gistIncompleteInsert *insert) {
for(i=0;i<insert->lenblk;i++)
itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
/* construct path */
gixtxlogFindPath( index, insert );
if ( insert->pathlen==0 ) {
/*it was split root, so we should only make new root*/
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
......@@ -662,8 +668,7 @@ gist_xlog_cleanup(void) {
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno,
ItemPointer key,
BlockNumber *path, int pathlen, SplitedPageLayout *dist ) {
ItemPointer key, SplitedPageLayout *dist ) {
XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
......@@ -681,7 +686,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
xlrec->node = node;
xlrec->origblkno = blkno;
xlrec->npage = (uint16)npage;
xlrec->pathlen = (uint16)pathlen;
if ( key )
xlrec->key = *key;
else
......@@ -692,15 +696,6 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
rdata[0].len = sizeof( gistxlogPageSplit );
rdata[0].next = NULL;
if ( pathlen ) {
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)path;
rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
rdata[cur].next = NULL;
cur++;
}
ptr=dist;
while(ptr) {
rdata[cur].buffer = InvalidBuffer;
......@@ -725,8 +720,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
XLogRecData *
formUpdateRdata(RelFileNode node, BlockNumber blkno,
OffsetNumber *todelete, int ntodelete, bool emptypage,
IndexTuple *itup, int ituplen, ItemPointer key,
BlockNumber *path, int pathlen) {
IndexTuple *itup, int ituplen, ItemPointer key ) {
XLogRecData *rdata;
gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
......@@ -740,7 +734,6 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno,
if ( emptypage ) {
xlrec->isemptypage = true;
xlrec->ntodelete = 0;
xlrec->pathlen = 0;
rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
rdata->buffer = InvalidBuffer;
......@@ -752,24 +745,14 @@ formUpdateRdata(RelFileNode node, BlockNumber blkno,
xlrec->isemptypage = false;
xlrec->ntodelete = ntodelete;
xlrec->pathlen = pathlen;
rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) );
rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 2 + ituplen ) );
rdata->buffer = InvalidBuffer;
rdata->data = (char*)xlrec;
rdata->len = sizeof(gistxlogEntryUpdate);
rdata->next = NULL;
if ( pathlen ) {
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)path;
rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
rdata[cur].next = NULL;
cur++;
}
if ( ntodelete ) {
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/index/indexam.c,v 1.83 2005/06/13 23:14:48 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/index/indexam.c,v 1.84 2005/06/27 12:45:22 teodor Exp $
*
* INTERFACE ROUTINES
* index_open - open an index relation by relation OID
......@@ -287,7 +287,6 @@ index_beginscan_internal(Relation indexRelation,
FmgrInfo *procedure;
RELATION_CHECKS;
GET_REL_PROCEDURE(ambeginscan);
RelationIncrementReferenceCount(indexRelation);
......@@ -300,6 +299,13 @@ index_beginscan_internal(Relation indexRelation,
*/
LockRelation(indexRelation, AccessShareLock);
/*
* LockRelation can clean rd_aminfo structure, so fill procedure
* after LockRelation
*/
GET_REL_PROCEDURE(ambeginscan);
/*
* Tell the AM to open a scan.
*/
......
......@@ -9,7 +9,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/gist.h,v 1.47 2005/06/20 10:29:36 teodor Exp $
* $PostgreSQL: pgsql/src/include/access/gist.h,v 1.48 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -19,6 +19,8 @@
#include "storage/bufpage.h"
#include "storage/off.h"
#include "utils/rel.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
/*
* amproc indexes for GiST indexes.
......@@ -39,9 +41,22 @@
#define F_DELETED (1 << 1)
#define F_TUPLES_DELETED (1 << 2)
typedef XLogRecPtr GistNSN;
typedef struct GISTPageOpaqueData
{
uint32 flags;
uint8 flags;
/* number page to which current one is splitted in last split */
uint8 nsplited;
/* level of page, 0 - leaf */
uint16 level;
BlockNumber rightlink;
/* the only meaning - change this value if
page split. */
GistNSN nsn;
} GISTPageOpaqueData;
typedef GISTPageOpaqueData *GISTPageOpaque;
......@@ -90,18 +105,20 @@ typedef struct GISTENTRY
bool leafkey;
} GISTENTRY;
#define GistPageIsLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_LEAF)
#define GistPageGetOpaque(page) ( (GISTPageOpaque) PageGetSpecialPointer(page) )
#define GistPageIsLeaf(page) ( GistPageGetOpaque(page)->flags & F_LEAF)
#define GIST_LEAF(entry) (GistPageIsLeaf((entry)->page))
#define GistPageSetLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_LEAF)
#define GistPageSetNonLeaf(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_LEAF)
#define GistPageSetLeaf(page) ( GistPageGetOpaque(page)->flags |= F_LEAF)
#define GistPageSetNonLeaf(page) ( GistPageGetOpaque(page)->flags &= ~F_LEAF)
#define GistPageIsDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_DELETED)
#define GistPageSetDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_DELETED)
#define GistPageSetNonDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_DELETED)
#define GistPageIsDeleted(page) ( GistPageGetOpaque(page)->flags & F_DELETED)
#define GistPageSetDeleted(page) ( GistPageGetOpaque(page)->flags |= F_DELETED)
#define GistPageSetNonDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_DELETED)
#define GistTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags & F_TUPLES_DELETED)
#define GistMarkTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags |= F_TUPLES_DELETED)
#define GistClearTuplesDeleted(page) (((GISTPageOpaque) PageGetSpecialPointer(page))->flags &= ~F_TUPLES_DELETED)
#define GistTuplesDeleted(page) ( GistPageGetOpaque(page)->flags & F_TUPLES_DELETED)
#define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
#define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED)
/*
* Vector of GISTENTRY structs; user-defined methods union and pick
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.5 2005/06/20 15:22:38 teodor Exp $
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.6 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -20,7 +20,13 @@
#include "access/xlogdefs.h"
#include "fmgr.h"
#define GIST_UNLOCK BUFFER_LOCK_UNLOCK
#define GIST_SHARE BUFFER_LOCK_SHARE
#define GIST_EXCLUSIVE BUFFER_LOCK_EXCLUSIVE
/*
* XXX old comment!!!
* When we descend a tree, we keep a stack of parent pointers. This
* allows us to follow a chain of internal node points until we reach
* a leaf node, and then back up the stack to re-examine the internal
......@@ -31,12 +37,15 @@
* the node's page that we stopped at (i.e. we followed the child
* pointer located at the specified offset).
*/
typedef struct GISTSTACK
typedef struct GISTSearchStack
{
struct GISTSTACK *parent;
OffsetNumber offset;
struct GISTSearchStack *next;
BlockNumber block;
} GISTSTACK;
/* to identify page changed */
GistNSN lsn;
/* to recognize split occured */
GistNSN parentlsn;
} GISTSearchStack;
typedef struct GISTSTATE
{
......@@ -57,8 +66,8 @@ typedef struct GISTSTATE
*/
typedef struct GISTScanOpaqueData
{
GISTSTACK *stack;
GISTSTACK *markstk;
GISTSearchStack *stack;
GISTSearchStack *markstk;
uint16 flags;
GISTSTATE *giststate;
MemoryContext tempCxt;
......@@ -68,6 +77,71 @@ typedef struct GISTScanOpaqueData
typedef GISTScanOpaqueData *GISTScanOpaque;
/* XLog stuff */
extern const XLogRecPtr XLogRecPtrForTemp;
#define XLOG_GIST_ENTRY_UPDATE 0x00
#define XLOG_GIST_ENTRY_DELETE 0x10
#define XLOG_GIST_NEW_ROOT 0x20
typedef struct gistxlogEntryUpdate {
RelFileNode node;
BlockNumber blkno;
uint16 ntodelete;
bool isemptypage;
/*
* It used to identify completeness of insert.
* Sets to leaf itup
*/
ItemPointerData key;
/* follow:
* 1. todelete OffsetNumbers
* 2. tuples to insert
*/
} gistxlogEntryUpdate;
#define XLOG_GIST_PAGE_SPLIT 0x30
typedef struct gistxlogPageSplit {
RelFileNode node;
BlockNumber origblkno; /*splitted page*/
uint16 npage;
/* see comments on gistxlogEntryUpdate */
ItemPointerData key;
/* follow:
* 1. gistxlogPage and array of IndexTupleData per page
*/
} gistxlogPageSplit;
#define XLOG_GIST_INSERT_COMPLETE 0x40
typedef struct gistxlogPage {
BlockNumber blkno;
int num;
} gistxlogPage;
#define XLOG_GIST_CREATE_INDEX 0x50
typedef struct gistxlogInsertComplete {
RelFileNode node;
/* follows ItemPointerData key to clean */
} gistxlogInsertComplete;
/* SplitedPageLayout - gistSplit function result */
typedef struct SplitedPageLayout {
gistxlogPage block;
IndexTupleData *list;
int lenlist;
Buffer buffer; /* to write after all proceed */
struct SplitedPageLayout *next;
} SplitedPageLayout;
/*
* GISTInsertStack used for locking buffers and transfer arguments during
* insertion
......@@ -79,15 +153,24 @@ typedef struct GISTInsertStack {
Buffer buffer;
Page page;
/* log sequence number from page->lsn to
recognize page update and compare it with page's nsn
to recognize page split*/
GistNSN lsn;
/* child's offset */
OffsetNumber childoffnum;
/* pointer to parent */
/* pointer to parent and child */
struct GISTInsertStack *parent;
struct GISTInsertStack *child;
bool todelete;
/* for gistFindPath */
struct GISTInsertStack *next;
} GISTInsertStack;
#define XLogRecPtrIsInvalid( r ) ( (r).xlogid == 0 && (r).xrecoff == 0 )
typedef struct {
Relation r;
IndexTuple *itup; /* in/out, points to compressed entry */
......@@ -97,10 +180,6 @@ typedef struct {
/* pointer to heap tuple */
ItemPointerData key;
/* path to stroe in XLog */
BlockNumber *path;
int pathlen;
} GISTInsertState;
/*
......@@ -124,7 +203,7 @@ typedef struct {
* constants tell us what sort of operation changed the index.
*/
#define GISTOP_DEL 0
#define GISTOP_SPLIT 1
/* #define GISTOP_SPLIT 1 */
#define ATTSIZE(datum, tupdesc, i, isnull) \
( \
......@@ -132,64 +211,6 @@ typedef struct {
att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
)
/* XLog stuff */
#define XLOG_GIST_ENTRY_UPDATE 0x00
#define XLOG_GIST_ENTRY_DELETE 0x10
#define XLOG_GIST_NEW_ROOT 0x20
typedef struct gistxlogEntryUpdate {
RelFileNode node;
BlockNumber blkno;
uint16 ntodelete;
uint16 pathlen;
bool isemptypage;
/*
* It used to identify completeness of insert.
* Sets to leaf itup
*/
ItemPointerData key;
/* follow:
* 1. path to root (BlockNumber)
* 2. todelete OffsetNumbers
* 3. tuples to insert
*/
} gistxlogEntryUpdate;
#define XLOG_GIST_PAGE_SPLIT 0x30
typedef struct gistxlogPageSplit {
RelFileNode node;
BlockNumber origblkno; /*splitted page*/
uint16 pathlen;
uint16 npage;
/* see comments on gistxlogEntryUpdate */
ItemPointerData key;
/* follow:
* 1. path to root (BlockNumber)
* 2. gistxlogPage and array of IndexTupleData per page
*/
} gistxlogPageSplit;
typedef struct gistxlogPage {
BlockNumber blkno;
int num;
} gistxlogPage;
#define XLOG_GIST_INSERT_COMPLETE 0x40
typedef struct gistxlogInsertComplete {
RelFileNode node;
/* follows ItemPointerData key to clean */
} gistxlogInsertComplete;
#define XLOG_GIST_CREATE_INDEX 0x50
/*
* mark tuples on inner pages during recovery
*/
......@@ -206,20 +227,14 @@ extern Datum gistinsert(PG_FUNCTION_ARGS);
extern MemoryContext createTempGistContext(void);
extern void initGISTstate(GISTSTATE *giststate, Relation index);
extern void freeGISTstate(GISTSTATE *giststate);
extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key);
extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
typedef struct SplitedPageLayout {
gistxlogPage block;
IndexTupleData *list;
int lenlist;
Buffer buffer; /* to write after all proceed */
struct SplitedPageLayout *next;
} SplitedPageLayout;
IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
extern IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
extern GISTInsertStack* gistFindPath( Relation r, BlockNumber child,
Buffer (*myReadBuffer)(bool, Relation, BlockNumber) );
/* gistxlog.c */
extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(char *buf, uint8 xl_info, char *rec);
......@@ -229,12 +244,10 @@ extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
extern XLogRecData* formUpdateRdata(RelFileNode node, BlockNumber blkno,
OffsetNumber *todelete, int ntodelete, bool emptypage,
IndexTuple *itup, int ituplen, ItemPointer key,
BlockNumber *path, int pathlen);
IndexTuple *itup, int ituplen, ItemPointer key);
extern XLogRecData* formSplitRdata(RelFileNode node, BlockNumber blkno,
ItemPointer key,
BlockNumber *path, int pathlen, SplitedPageLayout *dist );
ItemPointer key, SplitedPageLayout *dist);
extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
......@@ -243,7 +256,7 @@ extern Datum gistgettuple(PG_FUNCTION_ARGS);
extern Datum gistgetmulti(PG_FUNCTION_ARGS);
/* gistutil.c */
extern Buffer gistReadBuffer(Relation r, BlockNumber blkno);
extern Buffer gistNewBuffer(Relation r);
extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off);
extern bool gistnospace(Page page, IndexTuple *itvec, int len);
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/gistscan.h,v 1.26 2004/12/31 22:03:21 pgsql Exp $
* $PostgreSQL: pgsql/src/include/access/gistscan.h,v 1.27 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -15,13 +15,14 @@
#define GISTSCAN_H
#include "access/relscan.h"
#include "access/xlogdefs.h"
extern Datum gistbeginscan(PG_FUNCTION_ARGS);
extern Datum gistrescan(PG_FUNCTION_ARGS);
extern Datum gistmarkpos(PG_FUNCTION_ARGS);
extern Datum gistrestrpos(PG_FUNCTION_ARGS);
extern Datum gistendscan(PG_FUNCTION_ARGS);
extern void gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum);
extern void gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum, XLogRecPtr newlsn, XLogRecPtr oldlsn);
extern void ReleaseResources_gist(void);
#endif /* GISTSCAN_H */
......@@ -37,7 +37,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.281 2005/06/24 20:53:31 tgl Exp $
* $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.282 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 200506241
#define CATALOG_VERSION_NO 200506271
#endif
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.36 2005/06/24 20:53:31 tgl Exp $
* $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.37 2005/06/27 12:45:23 teodor Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
......@@ -112,7 +112,7 @@ DESCR("b-tree index access method");
DATA(insert OID = 405 ( hash 1 1 0 f f f f t hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete - hashcostestimate ));
DESCR("hash index access method");
#define HASH_AM_OID 405
DATA(insert OID = 783 ( gist 100 7 0 f t f f f gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
DATA(insert OID = 783 ( gist 100 7 0 f t f f t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
DESCR("GiST index access method");
#define GIST_AM_OID 783
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册