diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index b44b7839dea98c05ee2065e6fbab756e6a6aa749..6a4b925cd150522b88181a8fad78f78d68b8d041 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.79 2001/01/31 01:08:36 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.80 2001/02/02 19:49:15 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -38,7 +38,10 @@ extern bool FixBTree; Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release); static void _bt_fixtree(Relation rel, BlockNumber blkno); -static BlockNumber _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit); +static void _bt_fixbranch(Relation rel, BlockNumber lblkno, + BlockNumber rblkno, BTStack true_stack); +static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit); +static void _bt_fixup(Relation rel, Buffer buf); static OffsetNumber _bt_getoff(Page page, BlockNumber blkno); static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); @@ -64,7 +67,7 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, int leftfree, int rightfree, bool newitemonleft, Size firstrightitemsz); -static Buffer _bt_getstackbuf(Relation rel, BTStack stack); +static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access); static void _bt_pgaddtup(Relation rel, Page page, Size itemsize, BTItem btitem, OffsetNumber itup_off, const char *where); @@ -497,34 +500,7 @@ _bt_insertonpg(Relation rel, elog(ERROR, "bt_insertonpg: no root page found"); _bt_wrtbuf(rel, rbuf); _bt_wrtnorelbuf(rel, buf); - while(! P_LEFTMOST(lpageop)) - { - BlockNumber blkno = lpageop->btpo_prev; - LockBuffer(buf, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buf); - buf = _bt_getbuf(rel, blkno, BT_WRITE); - page = BufferGetPage(buf); - lpageop = (BTPageOpaque) PageGetSpecialPointer(page); - /* - * If someone else already created parent pages - * then it's time for _bt_fixtree() to check upper - * levels and fix them, if required. - */ - if (lpageop->btpo_parent != BTREE_METAPAGE) - { - blkno = lpageop->btpo_parent; - _bt_relbuf(rel, buf, BT_WRITE); - _bt_fixtree(rel, blkno); - goto formres; - } - } - /* - * Ok, we are on the leftmost page, it's write locked - * by us and its btpo_parent points to meta page - time - * for _bt_fixroot(). - */ - buf = _bt_fixroot(rel, buf, true); - _bt_relbuf(rel, buf, BT_WRITE); + _bt_fixup(rel, buf); goto formres; } @@ -561,16 +537,22 @@ _bt_insertonpg(Relation rel, ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid), bknum, P_HIKEY); - pbuf = _bt_getstackbuf(rel, stack); - - if (pbuf == InvalidBuffer) - elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!" - "\n\tRecreate index %s.", RelationGetRelationName(rel)); + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); /* Now we can write and unlock the children */ _bt_wrtbuf(rel, rbuf); _bt_wrtbuf(rel, buf); + if (pbuf == InvalidBuffer) + { + if (!FixBTree) + elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!" + "\n\tRecreate index %s.", RelationGetRelationName(rel)); + pfree(new_item); + _bt_fixbranch(rel, bknum, rbknum, stack); + goto formres; + } + /* Recursively update the parent */ newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, 0, NULL, new_item, stack->bts_offset); @@ -1132,7 +1114,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, * Also, re-set bts_blkno & bts_offset if changed. */ static Buffer -_bt_getstackbuf(Relation rel, BTStack stack) +_bt_getstackbuf(Relation rel, BTStack stack, int access) { BlockNumber blkno; Buffer buf; @@ -1145,7 +1127,7 @@ _bt_getstackbuf(Relation rel, BTStack stack) BTPageOpaque opaque; blkno = stack->bts_blkno; - buf = _bt_getbuf(rel, blkno, BT_WRITE); + buf = _bt_getbuf(rel, blkno, access); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); maxoff = PageGetMaxOffsetNumber(page); @@ -1179,13 +1161,13 @@ _bt_getstackbuf(Relation rel, BTStack stack) /* by here, the item we're looking for moved right at least one page */ if (P_RIGHTMOST(opaque)) { - _bt_relbuf(rel, buf, BT_WRITE); + _bt_relbuf(rel, buf, access); return(InvalidBuffer); } blkno = opaque->btpo_next; - _bt_relbuf(rel, buf, BT_WRITE); - buf = _bt_getbuf(rel, blkno, BT_WRITE); + _bt_relbuf(rel, buf, access); + buf = _bt_getbuf(rel, blkno, access); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); maxoff = PageGetMaxOffsetNumber(page); @@ -1400,10 +1382,13 @@ _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release) /* * Now read other pages (if any) on level and add them to new root. + * Here we break one of our locking rules - never hold lock on parent + * page when acquiring lock on its child, - but we free from deadlock: + * * If concurrent process will split one of pages on this level then it - * will notice either btpo_parent == metablock or btpo_parent == rootblk. - * In first case it will give up its locks and try to lock leftmost page - * buffer (oldrootbuf) to fix root - ie it will wait for us and let us + * will see either btpo_parent == metablock or btpo_parent == rootblk. + * In first case it will give up its locks and walk to the leftmost page + * (oldrootbuf) in _bt_fixup() - ie it will wait for us and let us * continue. In second case it will try to lock rootbuf keeping its locks * on buffers we already passed, also waiting for us. If we'll have to * unlock rootbuf (split it) and that process will have to split page @@ -1547,14 +1532,12 @@ _bt_fixtree(Relation rel, BlockNumber blkno) * Check/fix level starting from page in buffer buf up to block * limit on *child* level (or till rightmost child page if limit * is InvalidBlockNumber). Start buffer must be read locked. - * No pins/locks are held on exit. Returns block number of last - * visited/pointing-to-limit page on *check/fix* level. + * No pins/locks are held on exit. */ -static BlockNumber +static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) { BlockNumber blkno = BufferGetBlockNumber(buf); - BlockNumber pblkno = blkno; Page page; BTPageOpaque opaque; BlockNumber cblkno[3]; @@ -1639,22 +1622,20 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) BTStackData stack; stack.bts_parent = NULL; - stack.bts_blkno = pblkno; + stack.bts_blkno = blkno; stack.bts_offset = InvalidOffsetNumber; ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), cblkno[0], P_HIKEY); - buf = _bt_getstackbuf(rel, &stack); + buf = _bt_getstackbuf(rel, &stack, BT_WRITE); if (buf == InvalidBuffer) elog(ERROR, "bt_fixlevel: pointer disappeared (need to recreate index)"); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); coff[0] = stack.bts_offset; - pblkno = BufferGetBlockNumber(buf); - parblk[0] = pblkno; - if (cblkno[0] == limit) - blkno = pblkno; /* where we have seen pointer to limit */ + blkno = BufferGetBlockNumber(buf); + parblk[0] = blkno; /* Check/insert missed pointers */ for (i = 1; i <= cidx; i++) @@ -1668,8 +1649,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) if (parblk[i] == parblk[i - 1] && coff[i] != coff[i - 1] + 1) elog(ERROR, "bt_fixlevel: invalid item order(2) (need to recreate index)"); - if (cblkno[i] == limit) - blkno = parblk[i]; continue; } /* Have to check next page ? */ @@ -1688,10 +1667,8 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) buf = newbuf; page = newpage; opaque = newopaque; - pblkno = BufferGetBlockNumber(buf); - parblk[i] = pblkno; - if (cblkno[i] == limit) - blkno = pblkno; + blkno = BufferGetBlockNumber(buf); + parblk[i] = blkno; continue; } /* unfound - need to insert on current page */ @@ -1730,7 +1707,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); } - pblkno = BufferGetBlockNumber(buf); + blkno = BufferGetBlockNumber(buf); coff[i] = itup_off; } else @@ -1740,9 +1717,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) } pfree(btitem); - parblk[i] = pblkno; - if (cblkno[i] == limit) - blkno = pblkno; + parblk[i] = blkno; } /* copy page with pointer to cblkno[cidx] to temp storage */ @@ -1750,8 +1725,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) _bt_relbuf(rel, buf, BT_WRITE); page = (Page)tbuf; opaque = (BTPageOpaque) PageGetSpecialPointer(page); - if (limit == InvalidBlockNumber) - blkno = pblkno; /* last visited page */ } /* Continue if current check/fix level page is rightmost */ @@ -1766,7 +1739,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) { if (cidx == 2) _bt_relbuf(rel, cbuf[2], BT_READ); - return(blkno); + return; } if (cblkno[0] == limit || cblkno[1] == limit) goodbye = true; @@ -1778,6 +1751,168 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) } } +/* + * Check/fix part of tree - branch - up from parent of level with blocks + * lblkno and rblknum. We first ensure that parent level has pointers + * to both lblkno & rblknum and if those pointers are on different + * parent pages then do the same for parent level, etc. No locks must + * be held on target level and upper on entry. No locks will be held + * on exit. Stack created when traversing tree down should be provided and + * it must points to parent level. rblkno must be on the right from lblkno. + * (This function is special edition of more expensive _bt_fixtree(), + * but it doesn't guarantee full consistency of tree.) + */ +static void +_bt_fixbranch(Relation rel, BlockNumber lblkno, + BlockNumber rblkno, BTStack true_stack) +{ + BlockNumber blkno = true_stack->bts_blkno; + BTStackData stack; + BTPageOpaque opaque; + Buffer buf, rbuf; + Page page; + OffsetNumber offnum; + + true_stack = true_stack->bts_parent; + for ( ; ; ) + { + buf = _bt_getbuf(rel, blkno, BT_READ); + + /* Check/fix parent level pointed by blkno */ + _bt_fixlevel(rel, buf, rblkno); + + /* + * Here parent level should have pointers for both + * lblkno and rblkno and we have to find them. + */ + stack.bts_parent = NULL; + stack.bts_blkno = blkno; + stack.bts_offset = InvalidOffsetNumber; + ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY); + buf = _bt_getstackbuf(rel, &stack, BT_READ); + if (buf == InvalidBuffer) + elog(ERROR, "bt_fixbranch: left pointer unfound (need to recreate index)"); + page = BufferGetPage(buf); + offnum = _bt_getoff(page, rblkno); + + if (offnum != InvalidOffsetNumber) /* right pointer found */ + { + if (offnum <= stack.bts_offset) + elog(ERROR, "bt_fixbranch: invalid item order (need to recreate index)"); + _bt_relbuf(rel, buf, BT_READ); + return; + } + + /* Pointers are on different parent pages - find right one */ + lblkno = BufferGetBlockNumber(buf); + + stack.bts_parent = NULL; + stack.bts_blkno = lblkno; + stack.bts_offset = InvalidOffsetNumber; + ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY); + rbuf = _bt_getstackbuf(rel, &stack, BT_READ); + if (rbuf == InvalidBuffer) + elog(ERROR, "bt_fixbranch: right pointer unfound (need to recreate index)"); + rblkno = BufferGetBlockNumber(rbuf); + _bt_relbuf(rel, rbuf, BT_READ); + + /* + * If we have parent item in true_stack then go up one level and + * ensure that it has pointers to new lblkno & rblkno. + */ + if (true_stack) + { + _bt_relbuf(rel, buf, BT_READ); + blkno = true_stack->bts_blkno; + true_stack = true_stack->bts_parent; + continue; + } + + /* + * Well, we are on the level that was root or unexistent when + * we started traversing tree down. If btpo_parent is updated + * then we'll use it to continue, else we'll fix/restore upper + * levels entirely. + */ + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (opaque->btpo_parent != BTREE_METAPAGE) + { + blkno = opaque->btpo_parent; + _bt_relbuf(rel, buf, BT_READ); + continue; + } + + /* Have to switch to excl buf lock and re-check btpo_parent */ + _bt_relbuf(rel, buf, BT_READ); + buf = _bt_getbuf(rel, blkno, BT_WRITE); + page = BufferGetPage(buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (opaque->btpo_parent != BTREE_METAPAGE) + { + blkno = opaque->btpo_parent; + _bt_relbuf(rel, buf, BT_WRITE); + continue; + } + + /* + * We hold excl lock on some internal page with unupdated + * btpo_parent - time for _bt_fixup. + */ + break; + } + + _bt_fixup(rel, buf); + + return; +} + +/* + * Having buf excl locked this routine walks to the left on level and + * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper + * levels. No buffer pins/locks will be held on exit. + */ +static void +_bt_fixup(Relation rel, Buffer buf) +{ + Page page; + BTPageOpaque opaque; + BlockNumber blkno; + + for ( ; ; ) + { + page = BufferGetPage(buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + /* + * If someone else already created parent pages + * then it's time for _bt_fixtree() to check upper + * levels and fix them, if required. + */ + if (opaque->btpo_parent != BTREE_METAPAGE) + { + blkno = opaque->btpo_parent; + _bt_relbuf(rel, buf, BT_WRITE); + _bt_fixtree(rel, blkno); + return; + } + if (P_LEFTMOST(opaque)) + break; + blkno = opaque->btpo_prev; + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buf); + buf = _bt_getbuf(rel, blkno, BT_WRITE); + } + + /* + * Ok, we are on the leftmost page, it's write locked + * by us and its btpo_parent points to meta page - time + * for _bt_fixroot(). + */ + buf = _bt_fixroot(rel, buf, true); + _bt_relbuf(rel, buf, BT_WRITE); + + return; +} + static OffsetNumber _bt_getoff(Page page, BlockNumber blkno) {