提交 deee7830 编写于 作者: V Vadim B. Mikheev

WAL

上级 14f214de
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.87 2000/10/13 02:02:59 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.88 2000/10/13 12:05:20 vadim Exp $
* *
* *
* INTERFACE ROUTINES * INTERFACE ROUTINES
...@@ -2198,7 +2198,8 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -2198,7 +2198,8 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp); HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp);
/* is it our tuple ? */ /* is it our tuple ? */
if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) if (PageGetSUI(page) != ThisStartUpID ||
htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
{ {
if (!InRecovery) if (!InRecovery)
elog(STOP, "heap_insert_undo: invalid target tuple in rollback"); elog(STOP, "heap_insert_undo: invalid target tuple in rollback");
...@@ -2394,7 +2395,8 @@ newt:; ...@@ -2394,7 +2395,8 @@ newt:;
htup = (HeapTupleHeader) PageGetItem(page, lp); htup = (HeapTupleHeader) PageGetItem(page, lp);
/* is it our tuple ? */ /* is it our tuple ? */
if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) if (PageGetSUI(page) != ThisStartUpID ||
htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
{ {
if (!InRecovery) if (!InRecovery)
elog(STOP, "heap_update_undo: invalid new tuple in rollback"); elog(STOP, "heap_update_undo: invalid new tuple in rollback");
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.65 2000/10/13 02:03:00 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.66 2000/10/13 12:05:20 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -612,6 +612,10 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ...@@ -612,6 +612,10 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
OffsetNumber maxoff; OffsetNumber maxoff;
OffsetNumber i; OffsetNumber i;
#ifdef XLOG
BTItem lhikey;
#endif
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
origpage = BufferGetPage(buf); origpage = BufferGetPage(buf);
leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
...@@ -680,6 +684,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ...@@ -680,6 +684,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
itemsz = ItemIdGetLength(itemid); itemsz = ItemIdGetLength(itemid);
item = (BTItem) PageGetItem(origpage, itemid); item = (BTItem) PageGetItem(origpage, itemid);
} }
#ifdef XLOG
lhikey = item;
#endif
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
LP_USED) == InvalidOffsetNumber) LP_USED) == InvalidOffsetNumber)
elog(STOP, "btree: failed to add hikey to the left sibling"); elog(STOP, "btree: failed to add hikey to the left sibling");
...@@ -793,12 +800,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, ...@@ -793,12 +800,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode));
hsize += sizeof(RelFileNode); hsize += sizeof(RelFileNode);
} }
else
{
Size itemsz = IndexTupleDSize(lhikey->bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
memcpy(xlbuf + hsize, (char*) lhikey, itemsz);
hsize += itemsz;
}
if (newitemonleft) if (newitemonleft)
{ {
/* /*
* Read comments in _bt_pgaddtup. * Read comments in _bt_pgaddtup.
* Actually, seems that in non-leaf splits newitem shouldn't * Actually, seems that in non-leaf splits newitem shouldn't
* go to first data key position. * go to first data key position on left page.
*/ */
if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque)) if (! P_ISLEAF(lopaque) && itup_off == P_FIRSTDATAKEY(lopaque))
{ {
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.65 2000/10/13 12:05:20 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -837,7 +837,7 @@ static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) ...@@ -837,7 +837,7 @@ static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
ItemPointerGetOffsetNumber(&(xlrec->target.tid)), ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
(char*)xlrec + hsize, (char*)xlrec + hsize,
record->xl_len - hsize, record->xl_len - hsize,
&hnode)) hnode))
elog(STOP, "btree_insert_redo: failed to add item"); elog(STOP, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
...@@ -908,7 +908,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -908,7 +908,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
else else
{ {
/* Delete items related to new right sibling */ /* Delete items related to new right sibling */
_bt_thin_left_page(page, record); _bt_fix_left_page(page, record, onleft);
if (onleft) if (onleft)
{ {
...@@ -924,6 +924,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -924,6 +924,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit +
sizeof(CommandId), sizeof(RelFileNode)); sizeof(CommandId), sizeof(RelFileNode));
} }
else
{
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += itemsz;
}
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) + itemsz = IndexTupleDSize(btdata.bti_itup) +
...@@ -933,7 +940,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -933,7 +940,7 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
ItemPointerGetOffsetNumber(&(xlrec->target.tid)), ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
(char*)xlrec + hsize, (char*)xlrec + hsize,
itemsz, itemsz,
&hnode)) hnode))
elog(STOP, "btree_split_redo: failed to add item"); elog(STOP, "btree_split_redo: failed to add item");
} }
else else
...@@ -994,6 +1001,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) ...@@ -994,6 +1001,13 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
pageop->btpo_flags |= BTP_LEAF; pageop->btpo_flags |= BTP_LEAF;
hsize += (sizeof(CommandId) + sizeof(RelFileNode)); hsize += (sizeof(CommandId) + sizeof(RelFileNode));
} }
else
{
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += itemsz;
}
if (onleft) /* skip target item */ if (onleft) /* skip target item */
{ {
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
...@@ -1198,17 +1212,28 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, ...@@ -1198,17 +1212,28 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
{ {
char *xlrec = (char*) XLogRecGetData(record); char *xlrec = (char*) XLogRecGetData(record);
Page page = (Page) BufferGetPage(buffer); Page page = (Page) BufferGetPage(buffer);
BTPageOpaque pageop; BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
BlockNumber blkno; BlockNumber blkno;
OffsetNumber offno; OffsetNumber offno;
ItemId lp; ItemId lp;
BTItem item;
for ( ; ; ) for ( ; ; )
{ {
offno = _bt_find_btitem(page, btitem); OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
if (offno != InvalidOffsetNumber)
for (offno = P_FIRSTDATAKEY(pageop);
offno <= maxoff;
offno = OffsetNumberNext(offno))
{
lp = PageGetItemId(page, offno);
item = (BTItem) PageGetItem(page, lp);
if (BTItemSame(item, btitem))
break;
}
if (offno <= maxoff)
break; break;
pageop = (BTPageOpaque) PageGetSpecialPointer(page); offno = InvalidOffsetNumber;
if (P_RIGHTMOST(pageop)) if (P_RIGHTMOST(pageop))
break; break;
blkno = pageop->btpo_next; blkno = pageop->btpo_next;
...@@ -1221,6 +1246,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, ...@@ -1221,6 +1246,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
if (PageIsNew((PageHeader) page)) if (PageIsNew((PageHeader) page))
elog(STOP, "btree_%s_undo: uninitialized right sibling", elog(STOP, "btree_%s_undo: uninitialized right sibling",
(insert) ? "insert" : "split"); (insert) ? "insert" : "split");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (XLByteLT(PageGetLSN(page), lsn)) if (XLByteLT(PageGetLSN(page), lsn))
break; break;
} }
...@@ -1250,9 +1276,9 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, ...@@ -1250,9 +1276,9 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId)); memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode)); memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid), result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
record->xl_xid, cid); record->xl_xid, cid);
if (result <= 0) /* no tuple or not owner */ if (result < 0) /* not owner */
{ {
UnlockAndReleaseBuffer(buffer); UnlockAndReleaseBuffer(buffer);
return; return;
...@@ -1278,7 +1304,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, ...@@ -1278,7 +1304,7 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
static bool static bool
_bt_add_item(Page page, OffsetNumber offno, _bt_add_item(Page page, OffsetNumber offno,
char* item, Size size, RelFileNode* hnode) char* item, Size size, RelFileNode hnode)
{ {
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
...@@ -1309,4 +1335,125 @@ _bt_add_item(Page page, OffsetNumber offno, ...@@ -1309,4 +1335,125 @@ _bt_add_item(Page page, OffsetNumber offno,
return(true); return(true);
} }
static bool
_bt_cleanup_page(Page page, RelFileNode hnode)
{
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
OffsetNumber offno;
ItemId lp;
BTItem item;
bool result = false;
for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; )
{
lp = PageGetItemId(page, offno);
item = (BTItem) PageGetItem(page, lp);
if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))
offno = OffsetNumberNext(offno);
else
{
PageIndexTupleDelete(page, offno);
maxoff = PageGetMaxOffsetNumber(page);
result = true;
}
}
return(result);
}
/*
* Remove from left sibling items belonging to right sibling
* and change P_HIKEY
*/
static void
_bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
{
char *xlrec = (char*) XLogRecGetData(record);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Size hsize = SizeOfBtreeSplit;
RelFileNode hnode;
BTItemData btdata;
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
OffsetNumber offno;
char *item;
Size itemsz;
char *previtem = NULL;
char *lhikey = NULL;
Size lhisize = 0;
if (pageop->btpo_flags & BTP_LEAF)
{
hsize += (sizeof(CommandId) + sizeof(RelFileNode));
memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit +
sizeof(CommandId), sizeof(RelFileNode));
}
else
{
lhikey = (char*)xlrec + hsize;
memcpy(&btdata, lhikey, sizeof(BTItemData));
lhisize = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += lhisize;
}
if (! P_RIGHTMOST(pageop))
PageIndexTupleDelete(page, P_HIKEY);
if (onleft) /* skip target item */
{
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += itemsz;
}
for (item = (char*)xlrec + hsize; ; )
{
memcpy(&btdata, item, sizeof(BTItemData));
for (offno = P_FIRSTDATAKEY(pageop);
offno <= maxoff;
offno = OffsetNumberNext(offno))
{
ItemId lp = PageGetItemId(page, offno);
BTItem btitem = (BTItem) PageGetItem(page, lp);
if (BTItemSame(&btdata, btitem))
{
PageIndexTupleDelete(page, offno);
break;
}
}
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
if (item + itemsz < (char*)record + record->xl_len)
{
previtem = item;
item += itemsz;
}
else
break;
}
/* time to insert hi-key */
if (pageop->btpo_flags & BTP_LEAF)
{
lhikey = (P_RIGHTMOST(pageop)) ? item : previtem;
memcpy(&btdata, lhikey, sizeof(BTItemData));
lhisize = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
}
if (! _bt_add_item(page,
P_HIKEY,
lhikey,
lhisize,
&hnode))
elog(STOP, "btree_split_redo: failed to add hi key to left sibling");
return;
}
#endif #endif
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# Makefile for access/transam # Makefile for access/transam
# #
# IDENTIFICATION # IDENTIFICATION
# $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.12 2000/08/31 16:09:46 petere Exp $ # $Header: /cvsroot/pgsql/src/backend/access/transam/Makefile,v 1.13 2000/10/13 12:05:21 vadim Exp $
# #
#------------------------------------------------------------------------- #-------------------------------------------------------------------------
...@@ -12,7 +12,7 @@ subdir = src/backend/access/transam ...@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
top_builddir = ../../../.. top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global include $(top_builddir)/src/Makefile.global
OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o rmgr.o OBJS = transam.o transsup.o varsup.o xact.o xid.o xlog.o xlogutils.o rmgr.o
all: SUBSYS.o all: SUBSYS.o
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: nbtree.h,v 1.44 2000/10/13 02:03:02 vadim Exp $ * $Id: nbtree.h,v 1.45 2000/10/13 12:05:22 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -259,8 +259,9 @@ typedef struct xl_btree_insert ...@@ -259,8 +259,9 @@ typedef struct xl_btree_insert
/* /*
* This is what we need to know about insert with split - * This is what we need to know about insert with split -
* 22 + [4+8] + [btitem] + right sibling btitems. Note that we need in * 22 + {4 + 8 | left hi-key} + [btitem] + right sibling btitems. Note that
* CommandID and HeapNode (4 + 8 bytes) only for leaf page insert. * we need in CommandID and HeapNode (4 + 8 bytes) for leaf pages
* and in left page hi-key for non-leaf ones.
*/ */
typedef struct xl_btree_split typedef struct xl_btree_split
{ {
...@@ -271,8 +272,8 @@ typedef struct xl_btree_split ...@@ -271,8 +272,8 @@ typedef struct xl_btree_split
/* /*
* We log all btitems from the right sibling. If new btitem goes on * We log all btitems from the right sibling. If new btitem goes on
* the left sibling then we log it too and it will be the first * the left sibling then we log it too and it will be the first
* BTItemData at the end of this struct, but after (for the leaf * BTItemData at the end of this struct after CommandId and HeapNode
* pages) CommandId and HeapNode. * on the leaf pages and left page hi-key on non-leaf ones.
*/ */
} xl_btree_split; } xl_btree_split;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册