diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 8832d6ec9b56cc7cf459754433171801e98ca602..4de424598edc9ff0b563b7e85b42220365edd251 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.102 2000/12/27 23:59:10 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.103 2000/12/28 13:00:06 vadim Exp $ * * * INTERFACE ROUTINES @@ -88,15 +88,15 @@ #include "access/xlogutils.h" -XLogRecPtr log_heap_move(Relation reln, ItemPointerData from, HeapTuple newtup); +XLogRecPtr log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from, + Buffer newbuf, HeapTuple newtup); +XLogRecPtr log_heap_clean(Relation reln, Buffer buffer); /* comments are in heap_update */ static xl_heaptid _locked_tuple_; static void _heap_unlock_tuple(void *data); -static XLogRecPtr log_heap_update(Relation reln, ItemPointerData from, - HeapTuple newtup, bool move); - -static void HeapPageCleanup(Buffer buffer); +static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, + ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move); /* ---------------------------------------------------------------- @@ -1364,23 +1364,45 @@ heap_insert(Relation relation, HeapTuple tup) /* XLOG stuff */ { xl_heap_insert xlrec; + xl_heap_header xlhdr; XLogRecPtr recptr; + XLogRecData rdata[3]; + Page page = BufferGetPage(buffer); + uint8 info = XLOG_HEAP_INSERT; xlrec.target.node = relation->rd_node; - xlrec.target.cid = GetCurrentCommandId(); xlrec.target.tid = tup->t_self; - xlrec.t_natts = tup->t_data->t_natts; - xlrec.t_oid = tup->t_data->t_oid; - xlrec.t_hoff = tup->t_data->t_hoff; - xlrec.mask = tup->t_data->t_infomask; - - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INSERT, - (char*) &xlrec, SizeOfHeapInsert, - (char*) tup->t_data + offsetof(HeapTupleHeaderData, t_bits), - tup->t_len - offsetof(HeapTupleHeaderData, t_bits)); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfHeapInsert; + rdata[0].next = &(rdata[1]); + + xlhdr.t_oid = tup->t_data->t_oid; + xlhdr.t_natts = tup->t_data->t_natts; + xlhdr.t_hoff = tup->t_data->t_hoff; + xlhdr.mask = tup->t_data->t_infomask; + rdata[1].buffer = buffer; + rdata[1].data = (char*)&xlhdr; + rdata[1].len = SizeOfHeapHeader; + rdata[1].next = &(rdata[2]); + + rdata[2].buffer = buffer; + rdata[2].data = (char*) tup->t_data + offsetof(HeapTupleHeaderData, t_bits); + rdata[2].len = tup->t_len - offsetof(HeapTupleHeaderData, t_bits); + rdata[2].next = NULL; + + /* If this is the single and first tuple on page... */ + if (ItemPointerGetOffsetNumber(&(tup->t_self)) == FirstOffsetNumber && + PageGetMaxOffsetNumber(page) == FirstOffsetNumber) + { + info |= XLOG_HEAP_INIT_PAGE; + rdata[1].buffer = rdata[2].buffer = InvalidBuffer; + } - PageSetLSN(BufferGetPage(buffer), recptr); - PageSetSUI(BufferGetPage(buffer), ThisStartUpID); + recptr = XLogInsert(RM_HEAP_ID, info, rdata); + + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); } END_CRIT_CODE; @@ -1475,27 +1497,35 @@ l1: return result; } - /* XLOG stuff */ START_CRIT_CODE; + /* store transaction information of xact deleting the tuple */ + TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax)); + tp.t_data->t_cmax = GetCurrentCommandId(); + tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); + /* XLOG stuff */ { xl_heap_delete xlrec; XLogRecPtr recptr; + XLogRecData rdata[2]; xlrec.target.node = relation->rd_node; - xlrec.target.cid = GetCurrentCommandId(); xlrec.target.tid = tp.t_self; - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, - (char*) &xlrec, SizeOfHeapDelete, NULL, 0); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfHeapDelete; + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = buffer; + rdata[1].data = NULL; + rdata[1].len = 0; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(dp, recptr); PageSetSUI(dp, ThisStartUpID); } - - /* store transaction information of xact deleting the tuple */ - TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax)); - tp.t_data->t_cmax = GetCurrentCommandId(); - tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | - HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); END_CRIT_CODE; #ifdef TUPLE_TOASTER_ACTIVE @@ -1673,8 +1703,8 @@ l2: /* XLOG stuff */ { - XLogRecPtr recptr = log_heap_update(relation, - oldtup.t_self, newtup, false); + XLogRecPtr recptr = log_heap_update(relation, buffer, oldtup.t_self, + newbuf, newtup, false); if (newbuf != buffer) { @@ -1967,62 +1997,139 @@ heap_restrpos(HeapScanDesc scan) } } -static XLogRecPtr -log_heap_update(Relation reln, ItemPointerData from, - HeapTuple newtup, bool move) +XLogRecPtr +log_heap_clean(Relation reln, Buffer buffer) { - char tbuf[sizeof(xl_heap_update) + 2 * sizeof(TransactionId)]; - xl_heap_update *xlrec = (xl_heap_update*) tbuf; - int hsize = SizeOfHeapUpdate; + xl_heap_clean xlrec; XLogRecPtr recptr; + XLogRecData rdata[2]; + + xlrec.node = reln->rd_node; + xlrec.block = BufferGetBlockNumber(buffer); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfHeapClean; + rdata[0].next = &(rdata[1]); - xlrec->target.node = reln->rd_node; - xlrec->target.tid = from; - xlrec->newtid = newtup->t_self; - xlrec->t_natts = newtup->t_data->t_natts; - xlrec->t_oid = newtup->t_data->t_oid; - xlrec->t_hoff = newtup->t_data->t_hoff; - xlrec->mask = newtup->t_data->t_infomask; + rdata[1].buffer = buffer; + rdata[1].data = NULL; + rdata[1].len = 0; + rdata[1].next = NULL; + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata); + + return(recptr); +} + +static XLogRecPtr +log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, + Buffer newbuf, HeapTuple newtup, bool move) +{ + char tbuf[MAXALIGN(sizeof(xl_heap_header)) + 2 * sizeof(TransactionId)]; + xl_heap_update xlrec; + xl_heap_header *xlhdr = (xl_heap_header*) tbuf; + int hsize = SizeOfHeapHeader; + XLogRecPtr recptr; + XLogRecData rdata[4]; + Page page = BufferGetPage(newbuf); + uint8 info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE; + + xlrec.target.node = reln->rd_node; + xlrec.target.tid = from; + xlrec.newtid = newtup->t_self; + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfHeapUpdate; + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = oldbuf; + rdata[1].data = NULL; + rdata[1].len = 0; + rdata[1].next = &(rdata[2]); + + xlhdr->t_oid = newtup->t_data->t_oid; + xlhdr->t_natts = newtup->t_data->t_natts; + xlhdr->t_hoff = newtup->t_data->t_hoff; + xlhdr->mask = newtup->t_data->t_infomask; if (move) /* remember xmin & xmax */ { TransactionId xmax; - xlrec->target.cid = (CommandId) newtup->t_data->t_xmin; if (newtup->t_data->t_infomask & HEAP_XMAX_INVALID || newtup->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) xmax = InvalidTransactionId; else xmax = newtup->t_data->t_xmax; memcpy(tbuf + hsize, &xmax, sizeof(TransactionId)); - hsize += sizeof(TransactionId); + memcpy(tbuf + hsize + sizeof(TransactionId), + &(newtup->t_data->t_xmin), sizeof(TransactionId)); + hsize += (2 * sizeof(TransactionId)); + } + rdata[2].buffer = newbuf; + rdata[2].data = (char*)&xlhdr; + rdata[2].len = hsize; + rdata[2].next = &(rdata[3]); + + rdata[3].buffer = newbuf; + rdata[3].data = (char*) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); + rdata[3].next = NULL; + + /* If new tuple is the single and first tuple on page... */ + if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && + PageGetMaxOffsetNumber(page) == FirstOffsetNumber) + { + info |= XLOG_HEAP_INIT_PAGE; + rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } - else - xlrec->target.cid = GetCurrentCommandId(); - - recptr = XLogInsert(RM_HEAP_ID, - (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE, - tbuf, hsize, - (char*) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits), - newtup->t_len - offsetof(HeapTupleHeaderData, t_bits)); + + recptr = XLogInsert(RM_HEAP_ID, info, rdata); return(recptr); } XLogRecPtr -log_heap_move(Relation reln, ItemPointerData from, HeapTuple newtup) +log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from, + Buffer newbuf, HeapTuple newtup) { - return(log_heap_update(reln, from, newtup, true)); + return(log_heap_update(reln, oldbuf, from, newbuf, newtup, true)); } static void -_heap_cleanup_page_(Page page) +heap_xlog_clean(bool redo, XLogRecPtr lsn, XLogRecord *record) { - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + xl_heap_clean *xlrec = (xl_heap_clean*) XLogRecGetData(record); + Relation reln; + Buffer buffer; + Page page; + OffsetNumber maxoff; OffsetNumber offnum; - ItemId lp; HeapTupleHeader htup; + ItemId lp; + + if (!redo || (record->xl_info & XLR_BKP_BLOCK_1)) + return; + + reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node); + + if (!RelationIsValid(reln)) + return; + + buffer = XLogReadBuffer(false, reln, xlrec->block); + if (!BufferIsValid(buffer)) + elog(STOP, "heap_clean_redo: no block"); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(STOP, "heap_clean_redo: uninitialized page"); + if (XLByteLE(lsn, PageGetLSN(page))) + { + UnlockAndReleaseBuffer(buffer); + return; + } + + maxoff = PageGetMaxOffsetNumber(page); for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) @@ -2039,22 +2146,7 @@ _heap_cleanup_page_(Page page) } PageRepairFragmentation(page); - -} - -static OffsetNumber -_heap_add_tuple_(Page page, HeapTupleHeader htup, uint32 len, OffsetNumber offnum) -{ - ItemId lp = PageGetItemId(page, offnum); - - if (len > PageGetFreeSpace(page) || - lp->lp_flags & LP_USED || lp->lp_len != 0) - _heap_cleanup_page_(page); - - offnum = PageAddItem(page, (Item)htup, len, offnum, - LP_USED | OverwritePageMode); - - return(offnum); + UnlockAndWriteBuffer(buffer); } static void @@ -2068,22 +2160,20 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; + if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) + return; + if (!RelationIsValid(reln)) return; + buffer = XLogReadBuffer(false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) - return; + elog(STOP, "heap_delete_%sdo: no block", (redo) ? "re" : "un"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) - { - PageInit(page, BufferGetPageSize(buffer), 0); - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - return; - } + elog(STOP, "heap_delete_%sdo: uninitialized page", (redo) ? "re" : "un"); if (redo) { @@ -2100,44 +2190,24 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); - /* page removed by vacuum ? */ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp)) - { - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - return; - } + elog(STOP, "heap_delete_%sdo: invalid lp", (redo) ? "re" : "un"); htup = (HeapTupleHeader) PageGetItem(page, lp); if (redo) { htup->t_xmax = record->xl_xid; - htup->t_cmax = xlrec->target.cid; - htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); - htup->t_infomask |= HEAP_XMAX_COMMITTED; + htup->t_cmax = FirstCommandId; + htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); PageSetLSN(page, lsn); PageSetSUI(page, ThisStartUpID); UnlockAndWriteBuffer(buffer); return; } - /* undo... is it our tuple ? */ - if (htup->t_xmax != record->xl_xid || htup->t_cmax != xlrec->target.cid) - { - if (!InRecovery) - elog(STOP, "heap_delete_undo: invalid target tuple in rollback"); - UnlockAndReleaseBuffer(buffer); - return; - } - else /* undo DELETE */ - { - htup->t_infomask |= HEAP_XMAX_INVALID; - UnlockAndWriteBuffer(buffer); - return; - } - + elog(STOP, "heap_delete_undo: unimplemented"); } static void @@ -2148,34 +2218,32 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; OffsetNumber offnum; - ItemId lp; HeapTupleHeader htup; + if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) + return; + if (!RelationIsValid(reln)) return; + buffer = XLogReadBuffer((redo) ? true : false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - { - PageInit(page, BufferGetPageSize(buffer), 0); - if (!redo) - { - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - return; - } - } + if (PageIsNew((PageHeader) page) && + (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE))) + elog(STOP, "heap_insert_%sdo: uninitialized page", (redo) ? "re" : "un"); if (redo) { char tbuf[MaxTupleSize]; - HeapTupleHeader htup = (HeapTupleHeader) tbuf; - uint32 newlen = record->xl_len - SizeOfHeapInsert; + xl_heap_header xlhdr; + uint32 newlen; + + if (record->xl_info & XLOG_HEAP_INIT_PAGE) + PageInit(page, BufferGetPageSize(buffer), 0); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { @@ -2184,27 +2252,25 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) } offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - /* page removed by vacuum ? */ if (PageGetMaxOffsetNumber(page) + 1 < offnum) - { - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - return; - } + elog(STOP, "heap_insert_redo: invalid max offset number"); + newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader; + memcpy((char*)&xlhdr, (char*)xlrec + SizeOfHeapInsert, SizeOfHeapHeader); memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), - (char*)xlrec + SizeOfHeapInsert, newlen); + (char*)xlrec + SizeOfHeapInsert + SizeOfHeapHeader, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_oid = xlrec->t_oid; - htup->t_natts = xlrec->t_natts; - htup->t_hoff = xlrec->t_hoff; + htup = (HeapTupleHeader) tbuf; + htup->t_oid = xlhdr.t_oid; + htup->t_natts = xlhdr.t_natts; + htup->t_hoff = xlhdr.t_hoff; htup->t_xmin = record->xl_xid; - htup->t_cmin = xlrec->target.cid; + htup->t_cmin = FirstCommandId; htup->t_xmax = htup->t_cmax = 0; - htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask; + htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask; - offnum = _heap_add_tuple_(page, htup, newlen, offnum); + offnum = PageAddItem(page, (Item)htup, newlen, offnum, + LP_USED | OverwritePageMode); if (offnum == InvalidOffsetNumber) elog(STOP, "heap_insert_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -2217,38 +2283,7 @@ heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ elog(STOP, "heap_insert_undo: bad page LSN"); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - lp = PageGetItemId(page, offnum); - - if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) - { - if (!InRecovery) - elog(STOP, "heap_insert_undo: unused/deleted target tuple in rollback"); - if (ItemIdDeleted(lp)) - { - lp->lp_flags &= ~LP_USED; - PageRepairFragmentation(page); - UnlockAndWriteBuffer(buffer); - } - else - UnlockAndReleaseBuffer(buffer); - return; - } - htup = (HeapTupleHeader) PageGetItem(page, lp); - - /* is it our tuple ? */ - Assert(PageGetSUI(page) == ThisStartUpID); - if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) - { - if (!InRecovery) - elog(STOP, "heap_insert_undo: invalid target tuple in rollback"); - UnlockAndReleaseBuffer(buffer); - return; - } - - lp->lp_flags |= LP_DELETE; /* mark for deletion */ - MarkBufferForCleanup(buffer, HeapPageCleanup); - + elog(STOP, "heap_insert_undo: unimplemented"); } /* @@ -2271,24 +2306,19 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) if (!RelationIsValid(reln)) return; + if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) + goto newt; + /* Deal with old tuple version */ buffer = XLogReadBuffer(false, reln, - ItemPointerGetBlockNumber(&(xlrec->target.tid))); + ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) - goto newt; + elog(STOP, "heap_update_%sdo: no block", (redo) ? "re" : "un"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) - { - if (samepage) - goto newsame; - PageInit(page, BufferGetPageSize(buffer), 0); - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - goto newt; - } + elog(STOP, "heap_update_%sdo: uninitialized old page", (redo) ? "re" : "un"); if (redo) { @@ -2307,16 +2337,9 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); - /* page removed by vacuum ? */ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp)) - { - if (samepage) - goto newsame; - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - goto newt; - } + elog(STOP, "heap_update_%sdo: invalid lp", (redo) ? "re" : "un"); + htup = (HeapTupleHeader) PageGetItem(page, lp); if (redo) @@ -2331,7 +2354,7 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) else { htup->t_xmax = record->xl_xid; - htup->t_cmax = xlrec->target.cid; + htup->t_cmax = FirstCommandId; htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); } @@ -2343,33 +2366,17 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) goto newt; } - /* undo... is it our tuple ? */ - if ((! move && (htup->t_xmax != record->xl_xid || - htup->t_cmax != xlrec->target.cid)) || - xlrec->target.cid != (CommandId) htup->t_xmin || - htup->t_cmin != (CommandId) record->xl_xid) - { - if (!InRecovery) - elog(STOP, "heap_update_undo: invalid old tuple in rollback"); - UnlockAndReleaseBuffer(buffer); - } - else /* undo */ - { - if (move) - { - htup->t_infomask &= ~(HEAP_XMIN_INVALID | - HEAP_MOVED_IN | HEAP_MOVED_OFF); - htup->t_infomask |= HEAP_XMIN_COMMITTED; - } - else - htup->t_infomask |= HEAP_XMAX_INVALID; - UnlockAndWriteBuffer(buffer); - } + elog(STOP, "heap_update_undo: unimplemented"); /* Deal with new tuple */ newt:; + if (redo && + ((record->xl_info & XLR_BKP_BLOCK_2) || + ((record->xl_info & XLR_BKP_BLOCK_1) && samepage))) + return; + buffer = XLogReadBuffer((redo) ? true : false, reln, ItemPointerGetBlockNumber(&(xlrec->newtid))); if (!BufferIsValid(buffer)) @@ -2378,23 +2385,19 @@ newt:; page = (Page) BufferGetPage(buffer); newsame:; - if (PageIsNew((PageHeader) page)) - { - PageInit(page, BufferGetPageSize(buffer), 0); - if (!redo) - { - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - return; - } - } + if (PageIsNew((PageHeader) page) && + (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE))) + elog(STOP, "heap_update_%sdo: uninitialized page", (redo) ? "re" : "un"); if (redo) { - char tbuf[MaxTupleSize]; - int hsize; - uint32 newlen; + char tbuf[MaxTupleSize]; + xl_heap_header xlhdr; + int hsize; + uint32 newlen; + + if (record->xl_info & XLOG_HEAP_INIT_PAGE) + PageInit(page, BufferGetPageSize(buffer), 0); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { @@ -2403,34 +2406,30 @@ newsame:; } offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); - /* page removed by vacuum ? */ if (PageGetMaxOffsetNumber(page) + 1 < offnum) - { - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - return; - } + elog(STOP, "heap_update_redo: invalid max offset number"); - hsize = SizeOfHeapUpdate; + hsize = SizeOfHeapUpdate + SizeOfHeapHeader; if (move) - hsize += sizeof(TransactionId); - newlen = record->xl_len - hsize; + hsize += (2 * sizeof(TransactionId)); - htup = (HeapTupleHeader) tbuf; + newlen = record->xl_len - hsize; + memcpy((char*)&xlhdr, (char*)xlrec + SizeOfHeapUpdate, SizeOfHeapHeader); memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), (char*)xlrec + hsize, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_oid = xlrec->t_oid; - htup->t_natts = xlrec->t_natts; - htup->t_hoff = xlrec->t_hoff; + htup = (HeapTupleHeader) tbuf; + htup->t_oid = xlhdr.t_oid; + htup->t_natts = xlhdr.t_natts; + htup->t_hoff = xlhdr.t_hoff; if (move) { - htup->t_xmin = (TransactionId) xlrec->target.cid; + hsize = SizeOfHeapUpdate + SizeOfHeapHeader; + memcpy(&(htup->t_xmax), (char*)xlrec + hsize, sizeof(TransactionId)); + memcpy(&(htup->t_xmin), + (char*)xlrec + hsize + sizeof(TransactionId), sizeof(TransactionId)); TransactionIdStore(record->xl_xid, (TransactionId *) &(htup->t_cmin)); - memcpy(&(htup->t_xmax), - (char*)xlrec + SizeOfHeapUpdate, sizeof(TransactionId)); - htup->t_infomask = xlrec->mask; + htup->t_infomask = xlhdr.mask; htup->t_infomask &= ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF); htup->t_infomask |= HEAP_MOVED_IN; @@ -2438,13 +2437,13 @@ newsame:; else { htup->t_xmin = record->xl_xid; - htup->t_cmin = xlrec->target.cid; + htup->t_cmin = FirstCommandId; htup->t_xmax = htup->t_cmax = 0; - htup->t_infomask = HEAP_XMAX_INVALID | xlrec->mask; + htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask; } - offnum = _heap_add_tuple_(page, htup, newlen, - ItemPointerGetOffsetNumber(&(xlrec->newtid))); + offnum = PageAddItem(page, (Item)htup, newlen, offnum, + LP_USED | OverwritePageMode); if (offnum == InvalidOffsetNumber) elog(STOP, "heap_update_redo: failed to add tuple"); PageSetLSN(page, lsn); @@ -2457,40 +2456,8 @@ newsame:; if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ elog(STOP, "heap_update_undo: bad new tuple page LSN"); - offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); - lp = PageGetItemId(page, offnum); + elog(STOP, "heap_update_undo: unimplemented"); - if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) - { - if (!InRecovery) - elog(STOP, "heap_update_undo: unused/deleted new tuple in rollback"); - if (ItemIdDeleted(lp)) - { - lp->lp_flags &= ~LP_USED; - PageRepairFragmentation(page); - UnlockAndWriteBuffer(buffer); - } - else - UnlockAndReleaseBuffer(buffer); - return; - } - htup = (HeapTupleHeader) PageGetItem(page, lp); - - /* is it our tuple ? */ - Assert(PageGetSUI(page) == ThisStartUpID); - if ((! move && (htup->t_xmin != record->xl_xid || - htup->t_cmin != xlrec->target.cid)) || - xlrec->target.cid != (CommandId) htup->t_xmin || - htup->t_cmin != (CommandId) record->xl_xid) - { - if (!InRecovery) - elog(STOP, "heap_update_undo: invalid new tuple in rollback"); - UnlockAndReleaseBuffer(buffer); - return; - } - - lp->lp_flags |= LP_DELETE; /* mark for deletion */ - MarkBufferForCleanup(buffer, HeapPageCleanup); } static void @@ -2539,6 +2506,7 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; + info &= XLOG_HEAP_OPMASK; if (info == XLOG_HEAP_INSERT) heap_xlog_insert(true, lsn, record); else if (info == XLOG_HEAP_DELETE) @@ -2547,6 +2515,8 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record) heap_xlog_update(true, lsn, record, false); else if (info == XLOG_HEAP_MOVE) heap_xlog_update(true, lsn, record, true); + else if (info == XLOG_HEAP_CLEAN) + heap_xlog_clean(true, lsn, record); else elog(STOP, "heap_redo: unknown op code %u", info); } @@ -2555,6 +2525,7 @@ void heap_undo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; + info &= XLOG_HEAP_OPMASK; if (info == XLOG_HEAP_INSERT) heap_xlog_insert(false, lsn, record); else if (info == XLOG_HEAP_DELETE) @@ -2563,23 +2534,17 @@ void heap_undo(XLogRecPtr lsn, XLogRecord *record) heap_xlog_update(false, lsn, record, false); else if (info == XLOG_HEAP_MOVE) heap_xlog_update(false, lsn, record, true); + else if (info == XLOG_HEAP_CLEAN) + heap_xlog_clean(false, lsn, record); else elog(STOP, "heap_undo: unknown op code %u", info); } -static void -HeapPageCleanup(Buffer buffer) -{ - Page page = (Page) BufferGetPage(buffer); - PageRepairFragmentation(page); -} - static void out_target(char *buf, xl_heaptid *target) { - sprintf(buf + strlen(buf), "node %u/%u; cid %u; tid %u/%u", + sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u", target->node.tblNode, target->node.relNode, - target->cid, ItemPointerGetBlockNumber(&(target->tid)), ItemPointerGetOffsetNumber(&(target->tid))); } @@ -2589,6 +2554,7 @@ heap_desc(char *buf, uint8 xl_info, char* rec) { uint8 info = xl_info & ~XLR_INFO_MASK; + info &= XLOG_HEAP_OPMASK; if (info == XLOG_HEAP_INSERT) { xl_heap_insert *xlrec = (xl_heap_insert*) rec; diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 0aa4947524719abff79931ca1afb50f0133713c7..993ad9a018e4dc9ad4f753af47c06c427e7b753c 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.70 2000/12/03 10:27:26 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.71 2000/12/28 13:00:07 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -520,39 +520,40 @@ _bt_insertonpg(Relation rel, { /* XLOG stuff */ { - char xlbuf[sizeof(xl_btree_insert) + - sizeof(CommandId) + sizeof(RelFileNode)]; - xl_btree_insert *xlrec = (xl_btree_insert*)xlbuf; - int hsize = SizeOfBtreeInsert; - BTItemData truncitem; - BTItem xlitem = btitem; - Size xlsize = IndexTupleDSize(btitem->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); + xl_btree_insert xlrec; + uint8 flag = XLOG_BTREE_INSERT; XLogRecPtr recptr; + XLogRecData rdata[2]; - xlrec->target.node = rel->rd_node; - ItemPointerSet(&(xlrec->target.tid), BufferGetBlockNumber(buf), newitemoff); - if (P_ISLEAF(lpageop)) - { - CommandId cid = GetCurrentCommandId(); - memcpy(xlbuf + hsize, &cid, sizeof(CommandId)); - hsize += sizeof(CommandId); - memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); - hsize += sizeof(RelFileNode); - } - /* - * Read comments in _bt_pgaddtup - */ - else if (newitemoff == P_FIRSTDATAKEY(lpageop)) + xlrec.target.node = rel->rd_node; + ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfBtreeInsert; + rdata[0].next = &(rdata[1]); + + /* Read comments in _bt_pgaddtup */ + if (!(P_ISLEAF(lpageop)) && newitemoff == P_FIRSTDATAKEY(lpageop)) { - truncitem = *btitem; + BTItemData truncitem = *btitem; + truncitem.bti_itup.t_info = sizeof(BTItemData); - xlitem = &truncitem; - xlsize = sizeof(BTItemData); + rdata[1].data = (char*)&truncitem; + rdata[1].len = sizeof(BTItemData); + } + else + { + rdata[1].data = (char*)btitem; + rdata[1].len = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); } + rdata[1].buffer = buf; + rdata[1].next = NULL; - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_INSERT, - xlbuf, hsize, (char*) xlitem, xlsize); + if (P_ISLEAF(lpageop)) + flag |= XLOG_BTREE_LEAF; + + recptr = XLogInsert(RM_BTREE_ID, flag, rdata); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); @@ -774,71 +775,63 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, */ START_CRIT_CODE; { - char xlbuf[sizeof(xl_btree_split) + - sizeof(CommandId) + sizeof(RelFileNode) + BLCKSZ]; - xl_btree_split *xlrec = (xl_btree_split*) xlbuf; - int hsize = SizeOfBtreeSplit; - int flag = (newitemonleft) ? - XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT; - BlockNumber blkno; - XLogRecPtr recptr; - - xlrec->target.node = rel->rd_node; - ItemPointerSet(&(xlrec->target.tid), *itup_blkno, *itup_off); - if (P_ISLEAF(lopaque)) - { - CommandId cid = GetCurrentCommandId(); - memcpy(xlbuf + hsize, &cid, sizeof(CommandId)); - hsize += sizeof(CommandId); - memcpy(xlbuf + hsize, &(_xlheapRel->rd_node), sizeof(RelFileNode)); - hsize += sizeof(RelFileNode); - } - else - { - Size itemsz = IndexTupleDSize(lhikey->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - memcpy(xlbuf + hsize, (char*) lhikey, itemsz); - hsize += itemsz; - } + xl_btree_split xlrec; + int flag = (newitemonleft) ? + XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT; + BlockNumber blkno; + XLogRecPtr recptr; + XLogRecData rdata[4]; + + xlrec.target.node = rel->rd_node; + ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off); if (newitemonleft) { - /* - * Read comments in _bt_pgaddtup. - * Actually, seems that in non-leaf splits newitem shouldn't - * go to first data key position on left page. - */ - if (! P_ISLEAF(lopaque) && *itup_off == P_FIRSTDATAKEY(lopaque)) - { - BTItemData truncitem = *newitem; - truncitem.bti_itup.t_info = sizeof(BTItemData); - memcpy(xlbuf + hsize, &truncitem, sizeof(BTItemData)); - hsize += sizeof(BTItemData); - } - else - { - Size itemsz = IndexTupleDSize(newitem->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - memcpy(xlbuf + hsize, (char*) newitem, itemsz); - hsize += itemsz; - } blkno = BufferGetBlockNumber(rbuf); - BlockIdSet(&(xlrec->otherblk), blkno); + BlockIdSet(&(xlrec.otherblk), blkno); } else { blkno = BufferGetBlockNumber(buf); - BlockIdSet(&(xlrec->otherblk), blkno); + BlockIdSet(&(xlrec.otherblk), blkno); } - - BlockIdSet(&(xlrec->rightblk), ropaque->btpo_next); - + BlockIdSet(&(xlrec.parentblk), lopaque->btpo_parent); + BlockIdSet(&(xlrec.leftblk), lopaque->btpo_prev); + BlockIdSet(&(xlrec.rightblk), ropaque->btpo_next); /* * Dirrect access to page is not good but faster - we should * implement some new func in page API. */ - recptr = XLogInsert(RM_BTREE_ID, flag, xlbuf, - hsize, (char*)rightpage + ((PageHeader) rightpage)->pd_upper, - ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper); + xlrec.leftlen = ((PageHeader)leftpage)->pd_special - + ((PageHeader)leftpage)->pd_upper; + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfBtreeSplit; + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char*)leftpage + ((PageHeader)leftpage)->pd_upper; + rdata[1].len = xlrec.leftlen; + rdata[1].next = &(rdata[2]); + + rdata[2].buffer = InvalidBuffer; + rdata[2].data = (char*)rightpage + ((PageHeader)rightpage)->pd_upper; + rdata[2].len = ((PageHeader)rightpage)->pd_special - + ((PageHeader)rightpage)->pd_upper; + rdata[2].next = NULL; + + if (!P_RIGHTMOST(ropaque)) + { + rdata[2].next = &(rdata[3]); + rdata[3].buffer = sbuf; + rdata[3].data = NULL; + rdata[3].len = 0; + rdata[3].next = NULL; + } + + if (P_ISLEAF(lopaque)) + flag |= XLOG_BTREE_LEAF; + + recptr = XLogInsert(RM_BTREE_ID, flag, rdata); PageSetLSN(leftpage, recptr); PageSetSUI(leftpage, ThisStartUpID); @@ -1143,25 +1136,29 @@ _bt_getstackbuf(Relation rel, BTStack stack) void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) { - Buffer rootbuf; - Page lpage, - rpage, - rootpage; - BlockNumber lbkno, - rbkno; - BlockNumber rootblknum; - BTPageOpaque rootopaque; - ItemId itemid; - BTItem item; - Size itemsz; - BTItem new_item; - Buffer metabuf; + Buffer rootbuf; + Page lpage, + rpage, + rootpage; + BlockNumber lbkno, + rbkno; + BlockNumber rootblknum; + BTPageOpaque rootopaque; + ItemId itemid; + BTItem item; + Size itemsz; + BTItem new_item; + Buffer metabuf; + Page metapg; + BTMetaPageData *metad; /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rootpage = BufferGetPage(rootbuf); rootblknum = BufferGetBlockNumber(rootbuf); - metabuf = _bt_getbuf(rel, BTREE_METAPAGE,BT_WRITE); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metapg = BufferGetPage(metabuf); + metad = BTPageGetMeta(metapg); /* NO ELOG(ERROR) from here till newroot op is logged */ START_CRIT_CODE; @@ -1222,39 +1219,46 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) elog(STOP, "btree: failed to add rightkey to new root page"); pfree(new_item); + metad->btm_root = rootblknum; + (metad->btm_level)++; + /* XLOG stuff */ { xl_btree_newroot xlrec; - Page metapg = BufferGetPage(metabuf); - BTMetaPageData *metad = BTPageGetMeta(metapg); XLogRecPtr recptr; + XLogRecData rdata[2]; xlrec.node = rel->rd_node; + xlrec.level = metad->btm_level; BlockIdSet(&(xlrec.rootblk), rootblknum); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfBtreeNewroot; + rdata[0].next = &(rdata[1]); /* * Dirrect access to page is not good but faster - we should * implement some new func in page API. */ - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, - (char*)&xlrec, SizeOfBtreeNewroot, - (char*)rootpage + ((PageHeader) rootpage)->pd_upper, - ((PageHeader) rootpage)->pd_special - ((PageHeader) rootpage)->pd_upper); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char*)rootpage + ((PageHeader) rootpage)->pd_upper; + rdata[1].len = ((PageHeader)rootpage)->pd_special - + ((PageHeader)rootpage)->pd_upper; + rdata[1].next = NULL; - metad->btm_root = rootblknum; - (metad->btm_level)++; + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata); PageSetLSN(rootpage, recptr); PageSetSUI(rootpage, ThisStartUpID); PageSetLSN(metapg, recptr); PageSetSUI(metapg, ThisStartUpID); - _bt_wrtbuf(rel, metabuf); } END_CRIT_CODE; /* write and let go of the new root buffer */ _bt_wrtbuf(rel, rootbuf); + _bt_wrtbuf(rel, metabuf); /* update and release new sibling, and finally the old root */ _bt_wrtbuf(rel, rbuf); diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 613b141b677d21a21e1378599fb7b92c0f0305a0..a253cb953cb69f1ea4385e99794b73ec82ba620e 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.43 2000/12/03 10:27:26 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.44 2000/12/28 13:00:07 vadim Exp $ * * NOTES * Postgres btree pages look like ordinary relation pages. The opaque @@ -167,6 +167,9 @@ _bt_getroot(Relation rel, int access) /* NO ELOG(ERROR) till meta is updated */ START_CRIT_CODE; + metad->btm_root = rootblkno; + metad->btm_level = 1; + _bt_pageinit(rootpage, BufferGetPageSize(rootbuf)); rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); rootopaque->btpo_flags |= (BTP_LEAF | BTP_ROOT); @@ -175,22 +178,26 @@ _bt_getroot(Relation rel, int access) { xl_btree_newroot xlrec; XLogRecPtr recptr; + XLogRecData rdata; xlrec.node = rel->rd_node; + xlrec.level = 1; BlockIdSet(&(xlrec.rootblk), rootblkno); + rdata.buffer = InvalidBuffer; + rdata.data = (char*)&xlrec; + rdata.len = SizeOfBtreeNewroot; + rdata.next = NULL; - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, - (char*)&xlrec, SizeOfBtreeNewroot, NULL, 0); + recptr = XLogInsert(RM_BTREE_ID, + XLOG_BTREE_NEWROOT|XLOG_BTREE_LEAF, &rdata); PageSetLSN(rootpage, recptr); PageSetSUI(rootpage, ThisStartUpID); PageSetLSN(metapg, recptr); PageSetSUI(metapg, ThisStartUpID); } - END_CRIT_CODE; - metad->btm_root = rootblkno; - metad->btm_level = 1; + END_CRIT_CODE; _bt_wrtnorelbuf(rel, rootbuf); @@ -408,11 +415,21 @@ _bt_pagedel(Relation rel, ItemPointer tid) { xl_btree_delete xlrec; XLogRecPtr recptr; + XLogRecData rdata[2]; xlrec.target.node = rel->rd_node; xlrec.target.tid = *tid; - recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, - (char*) &xlrec, SizeOfBtreeDelete, NULL, 0); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfBtreeDelete; + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = buf; + rdata[1].data = NULL; + rdata[1].len = 0; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 80d8d996d18183314e28839823c1b6c3ca382803..3f6abd501c9c905f53cfb5d00530e66c96473d30 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -12,7 +12,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.72 2000/11/30 08:46:21 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.73 2000/12/28 13:00:07 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -730,258 +730,24 @@ _bt_restscan(IndexScanDesc scan) } } -static bool -_bt_cleanup_page(Page page, RelFileNode hnode) -{ - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - OffsetNumber offno; - ItemId lp; - BTItem item; - bool result = false; - - for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; ) - { - lp = PageGetItemId(page, offno); - item = (BTItem) PageGetItem(page, lp); - if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))) - offno = OffsetNumberNext(offno); - else - { - PageIndexTupleDelete(page, offno); - maxoff = PageGetMaxOffsetNumber(page); - result = true; - } - } - - return(result); -} - -static bool -_bt_add_item(Page page, OffsetNumber offno, - char* item, Size size, RelFileNode hnode) -{ - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - if (offno > PageGetMaxOffsetNumber(page) + 1) - { - if (! (pageop->btpo_flags & BTP_REORDER)) - { - elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected"); - pageop->btpo_flags |= BTP_REORDER; - } - offno = PageGetMaxOffsetNumber(page) + 1; - } - - if (PageAddItem(page, (Item) item, size, offno, - LP_USED) == InvalidOffsetNumber) - { -#ifdef NOT_USED /* it's not valid code currently */ - /* ops, not enough space - try to deleted dead tuples */ - bool result; - - if (! P_ISLEAF(pageop)) - return(false); - result = _bt_cleanup_page(page, hnode); - if (!result || PageAddItem(page, (Item) item, size, offno, - LP_USED) == InvalidOffsetNumber) -#endif - return(false); - } - - return(true); -} - -/* - * Remove from left sibling items belonging to right sibling - * and change P_HIKEY - */ static void -_bt_fix_left_page(Page page, XLogRecord *record, bool onleft) +_bt_restore_page(Page page, char *from, int len) { - char *xlrec = (char*) XLogRecGetData(record); - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - Size hsize = SizeOfBtreeSplit; - RelFileNode hnode; - BTItemData btdata; - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - OffsetNumber offno; - char *item; - Size itemsz; - char *previtem = NULL; - char *lhikey = NULL; - Size lhisize = 0; - - if (pageop->btpo_flags & BTP_LEAF) - { - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId), sizeof(RelFileNode)); - } - else - { - lhikey = (char*)xlrec + hsize; - memcpy(&btdata, lhikey, sizeof(BTItemData)); - lhisize = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += lhisize; - } - - if (! P_RIGHTMOST(pageop)) - PageIndexTupleDelete(page, P_HIKEY); + BTItemData btdata; + Size itemsz; + char *end = from + len; - if (onleft) /* skip target item */ + for ( ; from < end; ) { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - - for (item = (char*)xlrec + hsize; ; ) - { - memcpy(&btdata, item, sizeof(BTItemData)); - for (offno = P_FIRSTDATAKEY(pageop); - offno <= maxoff; - offno = OffsetNumberNext(offno)) - { - ItemId lp = PageGetItemId(page, offno); - BTItem btitem = (BTItem) PageGetItem(page, lp); - - if (BTItemSame(&btdata, btitem)) - { - PageIndexTupleDelete(page, offno); - break; - } - } - + memcpy(&btdata, from, sizeof(BTItemData)); itemsz = IndexTupleDSize(btdata.bti_itup) + (sizeof(BTItemData) - sizeof(IndexTupleData)); itemsz = MAXALIGN(itemsz); - - if (item + itemsz < (char*)xlrec + record->xl_len) - { - previtem = item; - item += itemsz; - } - else - break; - } - - /* time to insert hi-key */ - if (pageop->btpo_flags & BTP_LEAF) - { - lhikey = (P_RIGHTMOST(pageop)) ? item : previtem; - memcpy(&btdata, lhikey, sizeof(BTItemData)); - lhisize = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - } - - if (! _bt_add_item(page, - P_HIKEY, - lhikey, - lhisize, - hnode)) - elog(STOP, "btree_split_redo: failed to add hi key to left sibling"); - - return; -} - -/* - * UNDO insertion on *leaf* page: - * - find inserted tuple; - * - delete it if heap tuple was inserted by the same xaction - */ -static void -_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, - XLogRecPtr lsn, XLogRecord *record) -{ - char *xlrec = (char*) XLogRecGetData(record); - Page page = (Page) BufferGetPage(buffer); - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - BlockNumber blkno; - OffsetNumber offno; - ItemId lp; - BTItem item; - - for ( ; ; ) - { - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - - for (offno = P_FIRSTDATAKEY(pageop); - offno <= maxoff; - offno = OffsetNumberNext(offno)) - { - lp = PageGetItemId(page, offno); - item = (BTItem) PageGetItem(page, lp); - if (BTItemSame(item, btitem)) - break; - } - if (offno <= maxoff) - break; - offno = InvalidOffsetNumber; - if (P_RIGHTMOST(pageop)) - break; - blkno = pageop->btpo_next; - UnlockAndReleaseBuffer(buffer); - buffer = XLogReadBuffer(false, reln, blkno); - if (!BufferIsValid(buffer)) - elog(STOP, "btree_%s_undo: lost right sibling", - (insert) ? "insert" : "split"); - page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_%s_undo: uninitialized right sibling", - (insert) ? "insert" : "split"); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (XLByteLT(PageGetLSN(page), lsn)) - break; + if (PageAddItem(page, (Item) from, itemsz, + FirstOffsetNumber, LP_USED) == InvalidOffsetNumber) + elog(STOP, "_bt_restore_page: can't add item to page"); + from += itemsz; } - - if (offno == InvalidOffsetNumber) /* not found */ - { - if (!InRecovery) - elog(STOP, "btree_%s_undo: lost target tuple in rollback", - (insert) ? "insert" : "split"); - UnlockAndReleaseBuffer(buffer); - return; - } - - lp = PageGetItemId(page, offno); - - if (InRecovery) /* check heap tuple */ - { - if (!ItemIdDeleted(lp)) - { - int result; - CommandId cid; - RelFileNode hnode; - Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit; - - memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId)); - memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode)); - result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid), - record->xl_xid, cid); - if (result < 0) /* not owner */ - { - UnlockAndReleaseBuffer(buffer); - return; - } - } - PageIndexTupleDelete(page, offno); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_flags |= BTP_REORDER; - UnlockAndWriteBuffer(buffer); - return; - } - - /* normal rollback */ - if (ItemIdDeleted(lp)) /* marked for deletion ?! */ - elog(STOP, "btree_%s_undo: deleted target tuple in rollback", - (insert) ? "insert" : "split"); - - lp->lp_flags |= LP_DELETE; - MarkBufferForCleanup(buffer, IndexPageCleanup); - return; } static void @@ -992,7 +758,7 @@ btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; - if (!redo) + if (!redo || (record->xl_info & XLR_BKP_BLOCK_1)) return; xlrec = (xl_btree_delete*) XLogRecGetData(record); @@ -1031,52 +797,41 @@ btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) Page page; BTPageOpaque pageop; + if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) + return; + xlrec = (xl_btree_insert*) XLogRecGetData(record); reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); if (!RelationIsValid(reln)) return; - buffer = XLogReadBuffer((redo) ? true : false, reln, + buffer = XLogReadBuffer(false, reln, ItemPointerGetBlockNumber(&(xlrec->target.tid))); if (!BufferIsValid(buffer)) - return; + elog(STOP, "btree_insert_%sdo: block unfound", (redo) ? "re" : "un"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_insert_%s: uninitialized page", - (redo) ? "redo" : "undo"); + elog(STOP, "btree_insert_%sdo: uninitialized page", (redo) ? "re" : "un"); pageop = (BTPageOpaque) PageGetSpecialPointer(page); if (redo) { if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); - else { - Size hsize = SizeOfBtreeInsert; - RelFileNode hnode; - - if (P_ISLEAF(pageop)) - { - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + - sizeof(CommandId), sizeof(RelFileNode)); - } - - if (! _bt_add_item(page, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - (char*)xlrec + hsize, - record->xl_len - hsize, - hnode)) + UnlockAndReleaseBuffer(buffer); + return; + } + if (PageAddItem(page, (Item)((char*)xlrec + SizeOfBtreeInsert), + record->xl_len - SizeOfBtreeInsert, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), + LP_USED) == InvalidOffsetNumber) elog(STOP, "btree_insert_redo: failed to add item"); - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); } else { - BTItemData btdata; - if (XLByteLT(PageGetLSN(page), lsn)) elog(STOP, "btree_insert_undo: bad page LSN"); @@ -1086,11 +841,7 @@ btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) return; } - memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + - sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); - - _bt_del_item(reln, buffer, &btdata, true, lsn, record); - + elog(STOP, "btree_insert_undo: unimplemented"); } return; @@ -1099,17 +850,15 @@ btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) static void btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) { - xl_btree_split *xlrec; + xl_btree_split *xlrec = (xl_btree_split*) XLogRecGetData(record); Relation reln; BlockNumber blkno; - BlockNumber parent; Buffer buffer; Page page; BTPageOpaque pageop; char *op = (redo) ? "redo" : "undo"; - bool isleaf; + bool isleaf = (record->xl_info & XLOG_BTREE_LEAF); - xlrec = (xl_btree_split*) XLogRecGetData(record); reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node); if (!RelationIsValid(reln)) return; @@ -1122,81 +871,33 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) elog(STOP, "btree_split_%s: lost left sibling", op); page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_split_%s: uninitialized left sibling", op); - + if (redo) + _bt_pageinit(page, BufferGetPageSize(buffer)); + else if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_undo: uninitialized left sibling"); pageop = (BTPageOpaque) PageGetSpecialPointer(page); - isleaf = P_ISLEAF(pageop); - parent = pageop->btpo_parent; if (redo) { - if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); + pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk)); + pageop->btpo_prev = BlockIdGetBlockNumber(&(xlrec->leftblk)); + if (onleft) + pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk)); else - { - /* Delete items related to new right sibling */ - _bt_fix_left_page(page, record, onleft); + pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0; - if (onleft) - { - BTItemData btdata; - Size hsize = SizeOfBtreeSplit; - Size itemsz; - RelFileNode hnode; - - pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk)); - if (isleaf) - { - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId), sizeof(RelFileNode)); - } - else - { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - - if (! _bt_add_item(page, - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - (char*)xlrec + hsize, - itemsz, - hnode)) - elog(STOP, "btree_split_redo: failed to add item"); - } - else - pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + _bt_restore_page(page, (char*)xlrec + SizeOfBtreeSplit, xlrec->leftlen); - pageop->btpo_flags &= ~BTP_ROOT; - - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); } else /* undo */ { if (XLByteLT(PageGetLSN(page), lsn)) elog(STOP, "btree_split_undo: bad left sibling LSN"); - - if (! isleaf || ! onleft) - UnlockAndReleaseBuffer(buffer); - else - { - BTItemData btdata; - - memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData)); - - _bt_del_item(reln, buffer, &btdata, false, lsn, record); - } + elog(STOP, "btree_split_undo: unimplemented"); } /* Right (new) sibling */ @@ -1207,106 +908,39 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) elog(STOP, "btree_split_%s: lost right sibling", op); page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page)) - { - if (!redo) - elog(STOP, "btree_split_undo: uninitialized right sibling"); - PageInit(page, BufferGetPageSize(buffer), 0); - } + if (redo) + _bt_pageinit(page, BufferGetPageSize(buffer)); + else if (PageIsNew((PageHeader) page)) + elog(STOP, "btree_split_undo: uninitialized right sibling"); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); if (redo) { - if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); - else - { - Size hsize = SizeOfBtreeSplit; - BTItemData btdata; - Size itemsz; - char *item; - - _bt_pageinit(page, BufferGetPageSize(buffer)); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (isleaf) - { - pageop->btpo_flags |= BTP_LEAF; - hsize += (sizeof(CommandId) + sizeof(RelFileNode)); - } - else - { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - if (onleft) /* skip target item */ - { - memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - hsize += itemsz; - } - - for (item = (char*)xlrec + hsize; - item < (char*)xlrec + record->xl_len; ) - { - memcpy(&btdata, item, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, - LP_USED) == InvalidOffsetNumber) - elog(STOP, "btree_split_redo: can't add item to right sibling"); - item += itemsz; - } + pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk)); + pageop->btpo_prev = (onleft) ? + ItemPointerGetBlockNumber(&(xlrec->target.tid)) : + BlockIdGetBlockNumber(&(xlrec->otherblk)); + pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk)); + pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0; - pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) : - BlockIdGetBlockNumber(&(xlrec->otherblk)); - pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk)); - pageop->btpo_parent = parent; + _bt_restore_page(page, + (char*)xlrec + SizeOfBtreeSplit + xlrec->leftlen, + record->xl_len - SizeOfBtreeSplit - xlrec->leftlen); - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); } else /* undo */ { if (XLByteLT(PageGetLSN(page), lsn)) elog(STOP, "btree_split_undo: bad right sibling LSN"); - - if (! isleaf || onleft) - UnlockAndReleaseBuffer(buffer); - else - { - char tbuf[BLCKSZ]; - int cnt; - char *item; - Size itemsz; - - item = (char*)xlrec + SizeOfBtreeSplit + - sizeof(CommandId) + sizeof(RelFileNode); - for (cnt = 0; item < (char*)xlrec + record->xl_len; ) - { - BTItem btitem = (BTItem) - (tbuf + cnt * (MAXALIGN(sizeof(BTItemData)))); - memcpy(btitem, item, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btitem->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - item += itemsz; - cnt++; - } - cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (cnt < 0) - elog(STOP, "btree_split_undo: target item unfound in right sibling"); - - item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData))); - - _bt_del_item(reln, buffer, (BTItem)item, false, lsn, record); - } + elog(STOP, "btree_split_undo: unimplemented"); } + if (!redo || (record->xl_info & XLR_BKP_BLOCK_1)) + return; + /* Right (next) page */ blkno = BlockIdGetBlockNumber(&(xlrec->rightblk)); if (blkno == P_NONE) @@ -1314,52 +948,42 @@ btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record) buffer = XLogReadBuffer(false, reln, blkno); if (!BufferIsValid(buffer)) - elog(STOP, "btree_split_%s: lost next right page", op); + elog(STOP, "btree_split_redo: lost next right page"); page = (Page) BufferGetPage(buffer); if (PageIsNew((PageHeader) page)) - elog(STOP, "btree_split_%s: uninitialized next right page", op); - - if (redo) - { - if (XLByteLE(lsn, PageGetLSN(page))) - UnlockAndReleaseBuffer(buffer); - else - { - pageop = (BTPageOpaque) PageGetSpecialPointer(page); - pageop->btpo_prev = (onleft) ? - BlockIdGetBlockNumber(&(xlrec->otherblk)) : - ItemPointerGetBlockNumber(&(xlrec->target.tid)); + elog(STOP, "btree_split_redo: uninitialized next right page"); - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } - } - else /* undo */ + if (XLByteLE(lsn, PageGetLSN(page))) { - if (XLByteLT(PageGetLSN(page), lsn)) - elog(STOP, "btree_split_undo: bad next right page LSN"); - UnlockAndReleaseBuffer(buffer); + return; } + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = (onleft) ? + BlockIdGetBlockNumber(&(xlrec->otherblk)) : + ItemPointerGetBlockNumber(&(xlrec->target.tid)); + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); } static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) { - xl_btree_newroot *xlrec; + xl_btree_newroot *xlrec = (xl_btree_newroot*) XLogRecGetData(record); Relation reln; Buffer buffer; Page page; + BTPageOpaque pageop; Buffer metabuf; Page metapg; + BTMetaPageData md; if (!redo) return; - xlrec = (xl_btree_newroot*) XLogRecGetData(record); reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node); if (!RelationIsValid(reln)) return; @@ -1370,74 +994,36 @@ btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record) if (!BufferIsValid(buffer)) elog(STOP, "btree_newroot_redo: no metapage"); page = (Page) BufferGetPage(buffer); + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); - if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn)) - { - BTPageOpaque pageop; - - _bt_pageinit(page, BufferGetPageSize(buffer)); - pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_flags |= BTP_ROOT; + pageop->btpo_prev = pageop->btpo_next = P_NONE; + pageop->btpo_parent = BTREE_METAPAGE; - pageop->btpo_flags |= BTP_ROOT; - pageop->btpo_prev = pageop->btpo_next = P_NONE; - pageop->btpo_parent = BTREE_METAPAGE; + if (record->xl_info & XLOG_BTREE_LEAF) + pageop->btpo_flags |= BTP_LEAF; - if (record->xl_len == SizeOfBtreeNewroot) /* no childs */ - pageop->btpo_flags |= BTP_LEAF; - else - { - BTItemData btdata; - Size itemsz; - char *item; + if (record->xl_len > SizeOfBtreeNewroot) + _bt_restore_page(page, + (char*)xlrec + SizeOfBtreeNewroot, + record->xl_len - SizeOfBtreeNewroot); - for (item = (char*)xlrec + SizeOfBtreeNewroot; - item < (char*)xlrec + record->xl_len; ) - { - memcpy(&btdata, item, sizeof(BTItemData)); - itemsz = IndexTupleDSize(btdata.bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, - LP_USED) == InvalidOffsetNumber) - elog(STOP, "btree_newroot_redo: can't add item"); - item += itemsz; - } - } - - PageSetLSN(page, lsn); - PageSetSUI(page, ThisStartUpID); - UnlockAndWriteBuffer(buffer); - } - else - UnlockAndReleaseBuffer(buffer); + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); metapg = BufferGetPage(metabuf); - if (PageIsNew((PageHeader) metapg)) - { - BTMetaPageData md; - - _bt_pageinit(metapg, BufferGetPageSize(metabuf)); - md.btm_magic = BTREE_MAGIC; - md.btm_version = BTREE_VERSION; - md.btm_root = P_NONE; - md.btm_level = 0; - memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md)); - } - - if (XLByteLT(PageGetLSN(metapg), lsn)) - { - BTMetaPageData *metad = BTPageGetMeta(metapg); - - metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk)); - (metad->btm_level)++; - PageSetLSN(metapg, lsn); - PageSetSUI(metapg, ThisStartUpID); - UnlockAndWriteBuffer(metabuf); - } - else - UnlockAndReleaseBuffer(metabuf); - - return; + _bt_pageinit(metapg, BufferGetPageSize(metabuf)); + md.btm_magic = BTREE_MAGIC; + md.btm_version = BTREE_VERSION; + md.btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk)); + md.btm_level = xlrec->level; + memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md)); + + PageSetLSN(metapg, lsn); + PageSetSUI(metapg, ThisStartUpID); + UnlockAndWriteBuffer(metabuf); } void @@ -1445,6 +1031,7 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; + info &= ~XLOG_BTREE_LEAF; if (info == XLOG_BTREE_DELETE) btree_xlog_delete(true, lsn, record); else if (info == XLOG_BTREE_INSERT) @@ -1464,6 +1051,7 @@ btree_undo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; + info &= ~XLOG_BTREE_LEAF; if (info == XLOG_BTREE_DELETE) btree_xlog_delete(false, lsn, record); else if (info == XLOG_BTREE_INSERT) @@ -1492,6 +1080,7 @@ btree_desc(char *buf, uint8 xl_info, char* rec) { uint8 info = xl_info & ~XLR_INFO_MASK; + info &= ~XLOG_BTREE_LEAF; if (info == XLOG_BTREE_INSERT) { xl_btree_insert *xlrec = (xl_btree_insert*) rec; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 57fc02f8a58bd2336a318c62c94b2f5ad123ba02..50f4f1a10091baa67fb5a50546802afab763637d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.90 2000/12/22 00:51:53 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.91 2000/12/28 13:00:08 vadim Exp $ * * NOTES * Transaction aborts can now occur two ways: @@ -665,6 +665,7 @@ RecordTransactionCommit() if (MyLastRecPtr.xrecoff != 0) { + XLogRecData rdata; xl_xact_commit xlrec; struct timeval delay; XLogRecPtr recptr; @@ -672,12 +673,16 @@ RecordTransactionCommit() BufmgrCommit(); xlrec.xtime = time(NULL); + rdata.buffer = InvalidBuffer; + rdata.data = (char *)(&xlrec); + rdata.len = SizeOfXactCommit; + rdata.next = NULL; + START_CRIT_CODE; /* * SHOULD SAVE ARRAY OF RELFILENODE-s TO DROP */ - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, - (char*) &xlrec, SizeOfXactCommit, NULL, 0); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, &rdata); /* * Sleep before commit! So we can flush more than one @@ -785,13 +790,18 @@ RecordTransactionAbort(void) if (MyLastRecPtr.xrecoff != 0 && !TransactionIdDidCommit(xid)) { + XLogRecData rdata; xl_xact_abort xlrec; XLogRecPtr recptr; xlrec.xtime = time(NULL); + rdata.buffer = InvalidBuffer; + rdata.data = (char *)(&xlrec); + rdata.len = SizeOfXactAbort; + rdata.next = NULL; + START_CRIT_CODE; - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, - (char*) &xlrec, SizeOfXactAbort, NULL, 0); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata); TransactionIdAbort(xid); MyProc->logRec.xrecoff = 0; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f39e279a729551fb846a951037dde3da62c277ba..7294b97ff341101550ef52463c4ca3562723f223 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.44 2000/12/18 18:45:03 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.45 2000/12/28 13:00:08 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include "storage/proc.h" #include "storage/spin.h" #include "storage/s_lock.h" +#include "storage/bufpage.h" #include "access/xlog.h" #include "access/xlogutils.h" #include "utils/builtins.h" @@ -43,6 +44,7 @@ XLogRecPtr MyLastRecPtr = {0, 0}; uint32 CritSectionCount = 0; bool InRecovery = false; StartUpID ThisStartUpID = 0; +XLogRecPtr RedoRecPtr; int XLOG_DEBUG = 0; @@ -71,11 +73,12 @@ typedef struct XLgwrResult typedef struct XLogCtlInsert { - XLgwrResult LgwrResult; - XLogRecPtr PrevRecord; - uint16 curridx; /* current block index in cache */ - XLogPageHeader currpage; - char *currpos; + XLgwrResult LgwrResult; + XLogRecPtr PrevRecord; + uint16 curridx; /* current block index in cache */ + XLogPageHeader currpage; + char *currpos; + XLogRecPtr RedoRecPtr; } XLogCtlInsert; typedef struct XLogCtlWrite @@ -96,6 +99,7 @@ typedef struct XLogCtlData uint32 XLogCacheByte; uint32 XLogCacheBlck; StartUpID ThisStartUpID; + XLogRecPtr RedoRecPtr; /* for postmaster */ slock_t insert_lck; slock_t info_lck; slock_t lgwr_lck; @@ -121,9 +125,7 @@ typedef enum DBState typedef struct ControlFileData { - /* - * XLOG state - */ + crc64 crc; uint32 logId; /* current log file id */ uint32 logSeg; /* current log file segment (1-based) */ XLogRecPtr checkPoint; /* last check point record ptr */ @@ -149,7 +151,6 @@ typedef struct ControlFileData static ControlFileData *ControlFile = NULL; - typedef struct CheckPoint { XLogRecPtr redo; /* next RecPtr available when we */ @@ -167,6 +168,13 @@ typedef struct CheckPoint #define XLOG_CHECKPOINT 0x00 #define XLOG_NEXTOID 0x10 +typedef struct BkpBlock +{ + crc64 crc; + RelFileNode node; + BlockNumber block; +} BkpBlock; + /* * We break each log file in 16Mb segments */ @@ -208,6 +216,33 @@ typedef struct CheckPoint (xrecoff % BLCKSZ >= SizeOfXLogPHD && \ (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord) +#define _INTL_MAXLOGRECSZ (3 * MAXLOGRECSZ) + +extern uint32 crc_table[]; +#define INIT_CRC64(crc) (crc.crc1 = 0xffffffff, crc.crc2 = 0xffffffff) +#define FIN_CRC64(crc) (crc.crc1 ^= 0xffffffff, crc.crc2 ^= 0xffffffff) +#define COMP_CRC64(crc, data, len) \ +{\ + uint32 __c1 = crc.crc1;\ + uint32 __c2 = crc.crc2;\ + char *__data = data;\ + uint32 __len = len;\ +\ + while (__len >= 2)\ + {\ + __c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\ + __c2 = crc_table[(__c2 ^ *__data++) & 0xff] ^ (__c2 >> 8);\ + __len -= 2;\ + }\ + if (__len > 0)\ + __c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\ + crc.crc1 = __c1;\ + crc.crc2 = __c2;\ +} + +void SetRedoRecPtr(void); +void GetRedoRecPtr(void); + static void GetFreeXLBuffer(void); static void XLogWrite(char *buffer); static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent); @@ -238,17 +273,26 @@ static XLogRecord *nextRecord = NULL; static bool InRedo = false; XLogRecPtr -XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 buflen) +XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) { XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecord *record; XLogSubRecord *subrecord; XLogRecPtr RecPtr; - uint32 len = hdrlen + buflen, - freespace, - wlen; + uint32 freespace; uint16 curridx; + XLogRecData *rdt; + Buffer dtbuf[2] = {InvalidBuffer, InvalidBuffer}; + bool dtbuf_bkp[2] = {false, false}; + XLogRecData dtbuf_rdt[4]; + BkpBlock dtbuf_xlg[2]; + XLogRecPtr dtbuf_lsn[2]; + crc64 dtbuf_crc[2], + rdata_crc; + uint32 len; + unsigned i; bool updrqst = false; + bool repeat = false; bool no_tran = (rmid == RM_XLOG_ID) ? true : false; if (info & XLR_INFO_MASK) @@ -260,9 +304,6 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 info &= ~XLR_INFO_MASK; } - if (len == 0 || len > MAXLOGRECSZ) - elog(STOP, "XLogInsert: invalid record len %u", len); - if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { RecPtr.xlogid = 0; @@ -270,15 +311,72 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 return (RecPtr); } +begin:; + INIT_CRC64(rdata_crc); + for (len = 0, rdt = rdata; ; ) + { + if (rdt->buffer == InvalidBuffer) + { + len += rdt->len; + COMP_CRC64(rdata_crc, rdt->data, rdt->len); + if (rdt->next == NULL) + break; + rdt = rdt->next; + continue; + } + for (i = 0; i < 2; i++) + { + if (rdt->buffer == dtbuf[i]) + { + if (dtbuf_bkp[i]) + rdt->data = NULL; + else if (rdt->data) + { + len += rdt->len; + COMP_CRC64(rdata_crc, rdt->data, rdt->len); + } + break; + } + if (dtbuf[i] == InvalidBuffer) + { + dtbuf[i] = rdt->buffer; + dtbuf_lsn[i] = *((XLogRecPtr*)(BufferGetBlock(rdt->buffer))); + if (XLByteLE(dtbuf_lsn[i], RedoRecPtr)) + { + crc64 crc; + + dtbuf_bkp[i] = true; + rdt->data = NULL; + INIT_CRC64(crc); + COMP_CRC64(crc, ((char*)BufferGetBlock(dtbuf[i])), BLCKSZ); + dtbuf_crc[i] = crc; + } + else if (rdt->data) + { + len += rdt->len; + COMP_CRC64(rdata_crc, rdt->data, rdt->len); + } + break; + } + } + if (i >= 2) + elog(STOP, "XLogInsert: can backup 2 blocks at most"); + if (rdt->next == NULL) + break; + rdt = rdt->next; + } + + if (len == 0 || len > MAXLOGRECSZ) + elog(STOP, "XLogInsert: invalid record len %u", len); + START_CRIT_CODE; /* obtain xlog insert lock */ if (TAS(&(XLogCtl->insert_lck))) /* busy */ { bool do_lgwr = true; - unsigned i = 0; - for (;;) + for (i = 0;;) { /* try to read LgwrResult while waiting for insert lock */ if (!TAS(&(XLogCtl->info_lck))) @@ -319,6 +417,59 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 } } + /* Race condition: RedoRecPtr was changed */ + RedoRecPtr = Insert->RedoRecPtr; + repeat = false; + for (i = 0; i < 2; i++) + { + if (dtbuf[i] == InvalidBuffer) + continue; + if (dtbuf_bkp[i] == false && + XLByteLE(dtbuf_lsn[i], RedoRecPtr)) + { + dtbuf[i] = InvalidBuffer; + repeat = true; + } + } + if (repeat) + { + S_UNLOCK(&(XLogCtl->insert_lck)); + END_CRIT_CODE; + goto begin; + } + + /* Attach backup blocks to record data */ + for (i = 0; i < 2; i++) + { + if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i])) + continue; + + info |= (XLR_SET_BKP_BLOCK(i)); + + dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]); + dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]); + COMP_CRC64(dtbuf_crc[i], + ((char*)&(dtbuf_xlg[i]) + offsetof(BkpBlock, node)), + (sizeof(BkpBlock) - offsetof(BkpBlock, node))); + FIN_CRC64(dtbuf_crc[i]); + dtbuf_xlg[i].crc = dtbuf_crc[i]; + + rdt->next = &(dtbuf_rdt[2 * i]); + + dtbuf_rdt[2 * i].data = (char*)&(dtbuf_xlg[i]); + dtbuf_rdt[2 * i].len = sizeof(BkpBlock); + len += sizeof(BkpBlock); + + rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]); + + dtbuf_rdt[2 * i + 1].data = (char*)(BufferGetBlock(dtbuf[i])); + dtbuf_rdt[2 * i + 1].len = BLCKSZ; + len += BLCKSZ; + dtbuf_rdt[2 * i + 1].next = NULL; + } + + /* Insert record */ + freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; if (freespace < SizeOfXLogRecord) { @@ -344,10 +495,15 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 record->xl_xact_prev = MyLastRecPtr; record->xl_xid = GetCurrentTransactionId(); - record->xl_len = (len > freespace) ? freespace : len; - record->xl_info = (len > freespace) ? - (info | XLR_TO_BE_CONTINUED) : info; + record->xl_len = len; + record->xl_info = info; record->xl_rmid = rmid; + + COMP_CRC64(rdata_crc, ((char*)record + offsetof(XLogRecord, xl_prev)), + (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev))); + FIN_CRC64(rdata_crc); + record->xl_crc = rdata_crc; + RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid; RecPtr.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + @@ -366,10 +522,10 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 sprintf(buf, "INSERT @ %u/%u: ", RecPtr.xlogid, RecPtr.xrecoff); xlog_outrec(buf, record); - if (hdr != NULL) + if (rdata->data != NULL) { strcat(buf, " - "); - RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, hdr); + RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data); } strcat(buf, "\n"); write(2, buf, strlen(buf)); @@ -377,31 +533,33 @@ XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 MyLastRecPtr = RecPtr; /* begin of record */ Insert->currpos += SizeOfXLogRecord; - if (freespace > 0) + + while (len) { - wlen = (hdrlen > freespace) ? freespace : hdrlen; - memcpy(Insert->currpos, hdr, wlen); - freespace -= wlen; - hdrlen -= wlen; - hdr += wlen; - Insert->currpos += wlen; - if (buflen > 0 && freespace > 0) + while (rdata->data == NULL) + rdata = rdata->next; + + if (freespace > 0) { - wlen = (buflen > freespace) ? freespace : buflen; - memcpy(Insert->currpos, buf, wlen); - freespace -= wlen; - buflen -= wlen; - buf += wlen; - Insert->currpos += wlen; + if (rdata->len > freespace) + { + memcpy(Insert->currpos, rdata->data, freespace); + rdata->data += freespace; + rdata->len -= freespace; + len -= freespace; + } + else + { + memcpy(Insert->currpos, rdata->data, rdata->len); + freespace -= rdata->len; + len -= rdata->len; + Insert->currpos += rdata->len; + rdata = rdata->next; + continue; + } } - Insert->currpos = ((char *) Insert->currpage) + - MAXALIGN(Insert->currpos - ((char *) Insert->currpage)); - len = hdrlen + buflen; - } - if (len != 0) - { -nbuf: + /* Use next buffer */ curridx = NextBufIdx(curridx); if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write)) { @@ -409,55 +567,16 @@ nbuf: updrqst = true; } else - { GetFreeXLBuffer(); - updrqst = false; - } freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord; Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD; subrecord = (XLogSubRecord *) Insert->currpos; + subrecord->xl_len = len; Insert->currpos += SizeOfXLogSubRecord; - if (hdrlen > freespace) - { - subrecord->xl_len = freespace; - /* we don't store info in subrecord' xl_info */ - subrecord->xl_info = XLR_TO_BE_CONTINUED; - memcpy(Insert->currpos, hdr, freespace); - hdrlen -= freespace; - hdr += freespace; - goto nbuf; - } - else if (hdrlen > 0) - { - subrecord->xl_len = hdrlen; - memcpy(Insert->currpos, hdr, hdrlen); - Insert->currpos += hdrlen; - freespace -= hdrlen; - hdrlen = 0; - } - else - subrecord->xl_len = 0; - if (buflen > freespace) - { - subrecord->xl_len += freespace; - /* we don't store info in subrecord' xl_info */ - subrecord->xl_info = XLR_TO_BE_CONTINUED; - memcpy(Insert->currpos, buf, freespace); - buflen -= freespace; - buf += freespace; - goto nbuf; - } - else if (buflen > 0) - { - subrecord->xl_len += buflen; - memcpy(Insert->currpos, buf, buflen); - Insert->currpos += buflen; - } - /* we don't store info in subrecord' xl_info */ - subrecord->xl_info = 0; - Insert->currpos = ((char *) Insert->currpage) + - MAXALIGN(Insert->currpos - ((char *) Insert->currpage)); } + + Insert->currpos = ((char *) Insert->currpage) + + MAXALIGN(Insert->currpos - ((char *) Insert->currpage)); freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos; /* @@ -469,12 +588,9 @@ nbuf: XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + Insert->currpos - ((char *) Insert->currpage); - /* - * All done! Update global LgwrRqst if some block was filled up. - */ + /* Need to update global LgwrRqst if some block was filled up */ if (freespace < SizeOfXLogRecord) - updrqst = true; /* curridx is filled and available for - * writing out */ + updrqst = true; /* curridx is filled and available for writing out */ else curridx = PrevBufIdx(curridx); LgwrRqst.Write = XLogCtl->xlblocks[curridx]; @@ -483,8 +599,6 @@ nbuf: if (updrqst) { - unsigned i = 0; - for (;;) { if (!TAS(&(XLogCtl->info_lck))) @@ -959,11 +1073,117 @@ MoveOfflineLogs(char *archdir, uint32 _logId, uint32 _logSeg) closedir(xldir); } +static void +RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) +{ + Relation reln; + Buffer buffer; + Page page; + BkpBlock bkpb; + char *blk; + int i; + + for (i = 0, blk = (char*)XLogRecGetData(record) + record->xl_len; i < 2; i++) + { + if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) + continue; + + memcpy((char*)&bkpb, blk, sizeof(BkpBlock)); + blk += sizeof(BkpBlock); + + reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node); + + if (reln) + { + buffer = XLogReadBuffer(true, reln, bkpb.block); + if (BufferIsValid(buffer)) + { + page = (Page) BufferGetPage(buffer); + memcpy((char*)page, blk, BLCKSZ); + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + } + } + + blk += BLCKSZ; + } +} + +static bool +RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) +{ + crc64 crc; + crc64 cbuf; + int i; + uint32 len = record->xl_len; + char *blk; + + for (i = 0; i < 2; i++) + { + if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) + continue; + + if (len <= (sizeof(BkpBlock) + BLCKSZ)) + { + elog(emode, "ReadRecord: record at %u/%u is too short to keep bkp block", + recptr.xlogid, recptr.xrecoff); + return(false); + } + len -= sizeof(BkpBlock); + len -= BLCKSZ; + } + + /* CRC of rmgr data */ + INIT_CRC64(crc); + COMP_CRC64(crc, ((char*)XLogRecGetData(record)), len); + COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)), + (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev))); + FIN_CRC64(crc); + + if (record->xl_crc.crc1 != crc.crc1 || record->xl_crc.crc2 != crc.crc2) + { + elog(emode, "ReadRecord: bad rmgr data CRC in record at %u/%u", + recptr.xlogid, recptr.xrecoff); + return(false); + } + + if (record->xl_len == len) + return(true); + + for (i = 0, blk = (char*)XLogRecGetData(record) + len; i < 2; i++) + { + if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i)))) + continue; + + INIT_CRC64(crc); + COMP_CRC64(crc, (blk + sizeof(BkpBlock)), BLCKSZ); + COMP_CRC64(crc, (blk + offsetof(BkpBlock, node)), + (sizeof(BkpBlock) - offsetof(BkpBlock, node))); + FIN_CRC64(crc); + memcpy((char*)&cbuf, blk, sizeof(crc64)); + + if (cbuf.crc1 != crc.crc1 || cbuf.crc2 != crc.crc2) + { + elog(emode, "ReadRecord: bad bkp block %d CRC in record at %u/%u", + i + 1, recptr.xlogid, recptr.xrecoff); + return(false); + } + blk += sizeof(BkpBlock); + blk += BLCKSZ; + } + + record->xl_len = len; /* !!! */ + + return(true); +} + static XLogRecord * ReadRecord(XLogRecPtr *RecPtr, char *buffer) { XLogRecord *record; XLogRecPtr tmpRecPtr = EndRecPtr; + uint32 len; bool nextmode = (RecPtr == NULL); int emode = (nextmode) ? LOG : STOP; bool noBlck = false; @@ -1032,11 +1252,10 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer) record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ); got_record:; - if (record->xl_len > - (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord)) + if (record->xl_len > _INTL_MAXLOGRECSZ) { - elog(emode, "ReadRecord: invalid record len %u in (%u, %u)", - record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); + elog(emode, "ReadRecord: too long record len %u in (%u, %u)", + record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); goto next_record_is_invalid; } if (record->xl_rmid > RM_MAX_ID) @@ -1046,21 +1265,15 @@ got_record:; goto next_record_is_invalid; } nextRecord = NULL; - if (record->xl_info & XLR_TO_BE_CONTINUED) + len = BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord; + if (record->xl_len > len) { - XLogSubRecord *subrecord; - uint32 len = record->xl_len; + XLogSubRecord *subrecord; + uint32 gotlen = len; - if (MAXALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ + - SizeOfXLogRecord != BLCKSZ) - { - elog(emode, "ReadRecord: invalid fragmented record len %u in (%u, %u)", - record->xl_len, RecPtr->xlogid, RecPtr->xrecoff); - goto next_record_is_invalid; - } - memcpy(buffer, record, record->xl_len + SizeOfXLogRecord); + memcpy(buffer, record, len + SizeOfXLogRecord); record = (XLogRecord *) buffer; - buffer += record->xl_len + SizeOfXLogRecord; + buffer += len + SizeOfXLogRecord; for (;;) { readOff++; @@ -1095,42 +1308,39 @@ got_record:; goto next_record_is_invalid; } subrecord = (XLogSubRecord *) ((char *) readBuf + SizeOfXLogPHD); - if (subrecord->xl_len == 0 || subrecord->xl_len > - (BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord)) + if (subrecord->xl_len == 0 || + record->xl_len < (subrecord->xl_len + gotlen)) { elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u", subrecord->xl_len, readId, readSeg, readOff); goto next_record_is_invalid; } - len += subrecord->xl_len; - if (len > MAXLOGRECSZ) + len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord; + + if (subrecord->xl_len > len) { - elog(emode, "ReadRecord: too long record len %u in (%u, %u)", - len, RecPtr->xlogid, RecPtr->xrecoff); - goto next_record_is_invalid; + memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, len); + gotlen += len; + buffer += len; + continue; } - memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, subrecord->xl_len); - buffer += subrecord->xl_len; - if (subrecord->xl_info & XLR_TO_BE_CONTINUED) + if (record->xl_len != (subrecord->xl_len + gotlen)) { - if (MAXALIGN(subrecord->xl_len) + - SizeOfXLogPHD + SizeOfXLogSubRecord != BLCKSZ) - { - elog(emode, "ReadRecord: invalid fragmented subrecord len %u in logfile %u seg %u off %u", - subrecord->xl_len, readId, readSeg, readOff); - goto next_record_is_invalid; - } - continue; + elog(emode, "ReadRecord: invalid len %u of constracted record in logfile %u seg %u off %u", + subrecord->xl_len + gotlen, readId, readSeg, readOff); + goto next_record_is_invalid; } + memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, subrecord->xl_len); break; } + if (!RecordIsValid(record, *RecPtr, emode)) + goto next_record_is_invalid; if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(subrecord->xl_len) + SizeOfXLogPHD + SizeOfXLogSubRecord) { nextRecord = (XLogRecord *) ((char *) subrecord + MAXALIGN(subrecord->xl_len) + SizeOfXLogSubRecord); } - record->xl_len = len; EndRecPtr.xlogid = readId; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ + SizeOfXLogPHD + SizeOfXLogSubRecord + @@ -1138,6 +1348,8 @@ got_record:; ReadRecPtr = *RecPtr; return (record); } + if (!RecordIsValid(record, *RecPtr, emode)) + goto next_record_is_invalid; if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(record->xl_len) + RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord) nextRecord = (XLogRecord *) ((char *) record + @@ -1322,6 +1534,13 @@ WriteControlFile(void) */ if (sizeof(ControlFileData) > BLCKSZ) elog(STOP, "sizeof(ControlFileData) is too large ... fix xlog.c"); + + INIT_CRC64(ControlFile->crc); + COMP_CRC64(ControlFile->crc, + ((char*)ControlFile + offsetof(ControlFileData, logId)), + (sizeof(ControlFileData) - offsetof(ControlFileData, logId))); + FIN_CRC64(ControlFile->crc); + memset(buffer, 0, BLCKSZ); memcpy(buffer, ControlFile, sizeof(ControlFileData)); @@ -1342,6 +1561,7 @@ WriteControlFile(void) static void ReadControlFile(void) { + crc64 crc; int fd; /* @@ -1356,6 +1576,15 @@ ReadControlFile(void) close(fd); + INIT_CRC64(crc); + COMP_CRC64(crc, + ((char*)ControlFile + offsetof(ControlFileData, logId)), + (sizeof(ControlFileData) - offsetof(ControlFileData, logId))); + FIN_CRC64(crc); + + if (crc.crc1 != ControlFile->crc.crc1 || crc.crc2 != ControlFile->crc.crc2) + elog(STOP, "Invalid CRC in control file"); + /* * Do compatibility checking immediately. We do this here for 2 reasons: * @@ -1396,6 +1625,12 @@ UpdateControlFile(void) { int fd; + INIT_CRC64(ControlFile->crc); + COMP_CRC64(ControlFile->crc, + ((char*)ControlFile + offsetof(ControlFileData, logId)), + (sizeof(ControlFileData) - offsetof(ControlFileData, logId))); + FIN_CRC64(ControlFile->crc); + fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) elog(STOP, "open(\"%s\") failed: %m", ControlFilePath); @@ -1461,6 +1696,7 @@ BootStrapXLOG() bool usexistent = false; XLogPageHeader page = (XLogPageHeader) buffer; XLogRecord *record; + crc64 crc; checkPoint.redo.xlogid = 0; checkPoint.redo.xrecoff = SizeOfXLogPHD; @@ -1487,6 +1723,13 @@ BootStrapXLOG() record->xl_rmid = RM_XLOG_ID; memcpy((char *) record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint)); + INIT_CRC64(crc); + COMP_CRC64(crc, ((char*)&checkPoint), sizeof(checkPoint)); + COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)), + (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev))); + FIN_CRC64(crc); + record->xl_crc = crc; + logFile = XLogFileInit(0, 0, &usexistent); if (write(logFile, buffer, BLCKSZ) != BLCKSZ) @@ -1532,7 +1775,7 @@ StartupXLOG() XLogRecPtr RecPtr, LastRec; XLogRecord *record; - char buffer[MAXLOGRECSZ + SizeOfXLogRecord]; + char buffer[_INTL_MAXLOGRECSZ + SizeOfXLogRecord]; elog(LOG, "starting up"); CritSectionCount++; @@ -1611,6 +1854,8 @@ StartupXLOG() ShmemVariableCache->oidCount = 0; ThisStartUpID = checkPoint.ThisStartUpID; + RedoRecPtr = XLogCtl->Insert.RedoRecPtr = + XLogCtl->RedoRecPtr = checkPoint.redo; if (XLByteLT(RecPtr, checkPoint.redo)) elog(STOP, "Invalid redo in checkPoint record"); @@ -1648,8 +1893,7 @@ StartupXLOG() /* Is REDO required ? */ if (XLByteLT(checkPoint.redo, RecPtr)) record = ReadRecord(&(checkPoint.redo), buffer); - else -/* read past CheckPoint record */ + else /* read past CheckPoint record */ record = ReadRecord(NULL, buffer); if (record->xl_len != 0) @@ -1676,6 +1920,9 @@ StartupXLOG() write(2, buf, strlen(buf)); } + if (record->xl_info & (XLR_BKP_BLOCK_1|XLR_BKP_BLOCK_2)) + RestoreBkpBlocks(record, EndRecPtr); + RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); record = ReadRecord(NULL, buffer); } while (record->xl_len != 0); @@ -1758,13 +2005,31 @@ StartupXLOG() } /* - * Postmaster uses it to set ThisStartUpID from XLogCtlData - * located in shmem after successful startup. + * Postmaster uses it to set ThisStartUpID & RedoRecPtr from + * XLogCtlData located in shmem after successful startup. */ void SetThisStartUpID(void) { ThisStartUpID = XLogCtl->ThisStartUpID; + RedoRecPtr = XLogCtl->RedoRecPtr; +} + +/* + * CheckPoint-er called by postmaster creates copy of RedoRecPtr + * for postmaster in shmem. Postmaster uses GetRedoRecPtr after + * that to update its own copy of RedoRecPtr. + */ +void +SetRedoRecPtr(void) +{ + XLogCtl->RedoRecPtr = RedoRecPtr; +} + +void +GetRedoRecPtr(void) +{ + RedoRecPtr = XLogCtl->RedoRecPtr; } /* @@ -1791,6 +2056,7 @@ CreateCheckPoint(bool shutdown) CheckPoint checkPoint; XLogRecPtr recptr; XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogRecData rdata; uint32 freespace; uint16 curridx; uint32 _logId; @@ -1844,6 +2110,7 @@ CreateCheckPoint(bool shutdown) checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid; checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ + Insert->currpos - ((char *) Insert->currpage); + RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; S_UNLOCK(&(XLogCtl->insert_lck)); SpinAcquire(XidGenLockId); @@ -1864,8 +2131,12 @@ CreateCheckPoint(bool shutdown) if (shutdown && checkPoint.undo.xrecoff != 0) elog(STOP, "Active transaction while data base is shutting down"); - recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, (char *) &checkPoint, - sizeof(checkPoint), NULL, 0); + rdata.buffer = InvalidBuffer; + rdata.data = (char *)(&checkPoint); + rdata.len = sizeof(checkPoint); + rdata.next = NULL; + + recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, &rdata); if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr)) elog(STOP, "XLog concurrent activity while data base is shutting down"); @@ -1941,10 +2212,14 @@ void XLogPutNextOid(Oid nextOid); void XLogPutNextOid(Oid nextOid) { - (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, - (char *) &nextOid, sizeof(Oid), NULL, 0); -} + XLogRecData rdata; + rdata.buffer = InvalidBuffer; + rdata.data = (char *)(&nextOid); + rdata.len = sizeof(Oid); + rdata.next = NULL; + (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata); +} void xlog_redo(XLogRecPtr lsn, XLogRecord *record) diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 3f2bf76a36ecdeb0066bdaa43a177141543439ad..cd4186eec83668304a2342d79ac07580fca830ab 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.101 2000/11/25 20:33:51 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.102 2000/12/28 13:00:12 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -147,6 +147,8 @@ static MemoryContext nogc = NULL; /* special no-gc mem context */ extern int optind; extern char *optarg; +extern void SetRedoRecPtr(void); + /* * At bootstrap time, we first declare all the indices to be built, and * then build them. The IndexList structure stores enough information @@ -349,6 +351,7 @@ BootstrapMain(int argc, char *argv[]) { CreateDummyCaches(); CreateCheckPoint(false); + SetRedoRecPtr(); } else if (xlogop == BS_XLOG_STARTUP) StartupXLOG(); diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 87ac198044dd644580797de490f2ce6a6f0c0c5f..64fc0102a876871913bc4fad74683a8197413b9c 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.46 2000/12/08 20:10:19 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/sequence.c,v 1.47 2000/12/28 13:00:17 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -306,25 +306,38 @@ nextval(PG_FUNCTION_ARGS) { xl_seq_rec xlrec; XLogRecPtr recptr; - - if (fetch) /* not all numbers were fetched */ - log -= fetch; + XLogRecData rdata[2]; + Page page = BufferGetPage(buf); xlrec.node = elm->rel->rd_node; - xlrec.value = next; + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = sizeof(xl_seq_rec); + rdata[0].next = &(rdata[1]); + + seq->last_value = next; + seq->is_called = 't'; + seq->log_cnt = 0; + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char*)page + ((PageHeader) page)->pd_upper; + rdata[1].len = ((PageHeader)page)->pd_special - + ((PageHeader)page)->pd_upper; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, rdata); - recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, - (char*) &xlrec, sizeof(xlrec), NULL, 0); + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); - PageSetLSN(BufferGetPage(buf), recptr); - PageSetSUI(BufferGetPage(buf), ThisStartUpID); + if (fetch) /* not all numbers were fetched */ + log -= fetch; } - /* save info in sequence relation */ + /* update on-disk data */ seq->last_value = last; /* last fetched number */ + seq->is_called = 't'; Assert(log >= 0); seq->log_cnt = log; /* how much is logged */ - seq->is_called = 't'; END_CRIT_CODE; LockBuffer(buf, BUFFER_LOCK_UNLOCK); @@ -385,25 +398,37 @@ do_setval(char *seqname, int32 next, bool iscalled) elm->last = next; /* last returned number */ elm->cached = next; /* last cached number (forget cached values) */ - /* save info in sequence relation */ START_CRIT_CODE; - seq->last_value = next; /* last fetched number */ - seq->is_called = iscalled ? 't' : 'f'; - seq->log_cnt = (iscalled) ? 0 : 1; - { xl_seq_rec xlrec; XLogRecPtr recptr; + XLogRecData rdata[2]; + Page page = BufferGetPage(buf); xlrec.node = elm->rel->rd_node; - xlrec.value = next; - - recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_SET|XLOG_NO_TRAN, - (char*) &xlrec, sizeof(xlrec), NULL, 0); - - PageSetLSN(BufferGetPage(buf), recptr); - PageSetSUI(BufferGetPage(buf), ThisStartUpID); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = sizeof(xl_seq_rec); + rdata[0].next = &(rdata[1]); + + seq->last_value = next; + seq->is_called = 't'; + seq->log_cnt = 0; + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char*)page + ((PageHeader) page)->pd_upper; + rdata[1].len = ((PageHeader)page)->pd_special - + ((PageHeader)page)->pd_upper; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN, rdata); + + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); } + /* save info in sequence relation */ + seq->last_value = next; /* last fetched number */ + seq->is_called = iscalled ? 't' : 'f'; + seq->log_cnt = (iscalled) ? 0 : 1; END_CRIT_CODE; LockBuffer(buf, BUFFER_LOCK_UNLOCK); @@ -708,50 +733,38 @@ get_param(DefElem *def) void seq_redo(XLogRecPtr lsn, XLogRecord *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; - Relation reln; - Buffer buffer; - Page page; - ItemId lp; - HeapTupleData tuple; - Form_pg_sequence seq; - xl_seq_rec *xlrec; - - if (info != XLOG_SEQ_LOG && info != XLOG_SEQ_SET) - elog(STOP, "seq_redo: unknown op code %u", info); + uint8 info = record->xl_info & ~XLR_INFO_MASK; + Relation reln; + Buffer buffer; + Page page; + char *item; + Size itemsz; + xl_seq_rec *xlrec = (xl_seq_rec*) XLogRecGetData(record); + sequence_magic *sm; - xlrec = (xl_seq_rec*) XLogRecGetData(record); + if (info != XLOG_SEQ_LOG) + elog(STOP, "seq_redo: unknown op code %u", info); reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node); if (!RelationIsValid(reln)) return; - buffer = XLogReadBuffer(false, reln, 0); + buffer = XLogReadBuffer(true, reln, 0); if (!BufferIsValid(buffer)) elog(STOP, "seq_redo: can't read block of %u/%u", xlrec->node.tblNode, xlrec->node.relNode); page = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) page) || - ((sequence_magic *) PageGetSpecialPointer(page))->magic != SEQ_MAGIC) - elog(STOP, "seq_redo: uninitialized page of %u/%u", - xlrec->node.tblNode, xlrec->node.relNode); - - if (XLByteLE(lsn, PageGetLSN(page))) - { - UnlockAndReleaseBuffer(buffer); - return; - } - - lp = PageGetItemId(page, FirstOffsetNumber); - Assert(ItemIdIsUsed(lp)); - tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp); - seq = (Form_pg_sequence) GETSTRUCT(&tuple); + PageInit((Page) page, BufferGetPageSize(buffer), sizeof(sequence_magic)); + sm = (sequence_magic *) PageGetSpecialPointer(page); + sm->magic = SEQ_MAGIC; - seq->last_value = xlrec->value; /* last logged value */ - seq->is_called = 't'; - seq->log_cnt = 0; + item = (char*)xlrec + sizeof(xl_seq_rec); + itemsz = record->xl_len - sizeof(xl_seq_rec); + itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item)item, itemsz, + FirstOffsetNumber, LP_USED) == InvalidOffsetNumber) PageSetLSN(page, lsn); PageSetSUI(page, ThisStartUpID); @@ -771,14 +784,12 @@ void seq_desc(char *buf, uint8 xl_info, char* rec) if (info == XLOG_SEQ_LOG) strcat(buf, "log: "); - else if (info == XLOG_SEQ_SET) - strcat(buf, "set: "); else { strcat(buf, "UNKNOWN"); return; } - sprintf(buf + strlen(buf), "node %u/%u; value %d", - xlrec->node.tblNode, xlrec->node.relNode, xlrec->value); + sprintf(buf + strlen(buf), "node %u/%u", + xlrec->node.tblNode, xlrec->node.relNode); } diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 909c9cea732efc91959b7e47b9672fe1fa428ec4..c0e9fbb5b1add38f2d7c19ab6a98aaa728bfd749 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.179 2000/12/22 23:12:05 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.180 2000/12/28 13:00:18 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -47,8 +47,10 @@ #include "utils/syscache.h" #include "utils/temprel.h" +extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer); extern XLogRecPtr log_heap_move(Relation reln, - ItemPointerData from, HeapTuple newtup); + Buffer oldbuf, ItemPointerData from, + Buffer newbuf, HeapTuple newtup); static MemoryContext vac_context = NULL; @@ -65,7 +67,7 @@ static void vacuum_rel(Oid relid); static void scan_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages); static void repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindices, Relation *Irel); static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacpagelist); -static void vacuum_page(Page page, VacPage vacpage); +static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage); static void vacuum_index(VacPageList vacpagelist, Relation indrel, int num_tuples, int keep_tuples); static void scan_index(Relation indrel, int num_tuples); static void update_relstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats); @@ -1070,7 +1072,9 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, if (last_vacuum_page->offsets_free > 0) /* there are dead tuples */ { /* on this page - clean */ Assert(!isempty); - vacuum_page(page, last_vacuum_page); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + vacuum_page(onerel, buf, last_vacuum_page); + LockBuffer(buf, BUFFER_LOCK_UNLOCK); dowrite = true; } else @@ -1469,7 +1473,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, int sv_offsets_used = destvacpage->offsets_used; destvacpage->offsets_used = 0; - vacuum_page(ToPage, destvacpage); + vacuum_page(onerel, cur_buffer, destvacpage); destvacpage->offsets_used = sv_offsets_used; } @@ -1496,7 +1500,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, { XLogRecPtr recptr = - log_heap_move(onerel, tuple.t_self, &newtup); + log_heap_move(onerel, Cbuf, tuple.t_self, + cur_buffer, &newtup); if (Cbuf != cur_buffer) { @@ -1609,7 +1614,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ToPage = BufferGetPage(cur_buffer); /* if this page was not used before - clean it */ if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0) - vacuum_page(ToPage, cur_page); + vacuum_page(onerel, cur_buffer, cur_page); } else LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1661,7 +1666,8 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)" { XLogRecPtr recptr = - log_heap_move(onerel, tuple.t_self, &newtup); + log_heap_move(onerel, buf, tuple.t_self, + cur_buffer, &newtup); PageSetLSN(page, recptr); PageSetSUI(page, ThisStartUpID); @@ -1810,11 +1816,12 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)" { Assert((*curpage)->blkno < (BlockNumber) blkno); buf = ReadBuffer(onerel, (*curpage)->blkno); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buf); if ((*curpage)->offsets_used == 0) /* this page was not used */ { if (!PageIsEmpty(page)) - vacuum_page(page, *curpage); + vacuum_page(onerel, buf, *curpage); } else /* this page was used */ @@ -1848,6 +1855,7 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)" Assert((*curpage)->offsets_used == num_tuples); checked_moved += num_tuples; } + LockBuffer(buf, BUFFER_LOCK_UNLOCK); WriteBuffer(buf); } Assert(num_moved == checked_moved); @@ -1891,6 +1899,8 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)" vacpage->offsets_free > 0) { buf = ReadBuffer(onerel, vacpage->blkno); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + START_CRIT_CODE; page = BufferGetPage(buf); num_tuples = 0; for (offnum = FirstOffsetNumber; @@ -1919,6 +1929,13 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)" } Assert(vacpage->offsets_free == num_tuples); PageRepairFragmentation(page); + { + XLogRecPtr recptr = log_heap_clean(onerel, buf); + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); + } + END_CRIT_CODE; + LockBuffer(buf, BUFFER_LOCK_UNLOCK); WriteBuffer(buf); } @@ -1969,7 +1986,6 @@ static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages) { Buffer buf; - Page page; VacPage *vacpage; int nblocks; int i; @@ -1983,8 +1999,9 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages) if ((*vacpage)->offsets_free > 0) { buf = ReadBuffer(onerel, (*vacpage)->blkno); - page = BufferGetPage(buf); - vacuum_page(page, *vacpage); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + vacuum_page(onerel, buf, *vacpage); + LockBuffer(buf, BUFFER_LOCK_UNLOCK); WriteBuffer(buf); } } @@ -2020,20 +2037,28 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages) * and repair its fragmentation. */ static void -vacuum_page(Page page, VacPage vacpage) +vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage) { + Page page = BufferGetPage(buffer); ItemId itemid; int i; /* There shouldn't be any tuples moved onto the page yet! */ Assert(vacpage->offsets_used == 0); + START_CRIT_CODE; for (i = 0; i < vacpage->offsets_free; i++) { itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]); itemid->lp_flags &= ~LP_USED; } PageRepairFragmentation(page); + { + XLogRecPtr recptr = log_heap_clean(onerel, buffer); + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); + } + END_CRIT_CODE; } diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index ef2896b3c2c506ebf8d82f9649c3e30c3e579e30..85bedcb24a24444e57721cc38fee4692e707268d 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.201 2000/12/20 21:51:52 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.202 2000/12/28 13:00:20 vadim Exp $ * * NOTES * @@ -194,6 +194,8 @@ extern char *optarg; extern int optind, opterr; +extern void GetRedoRecPtr(void); + /* * postmaster.c - function prototypes */ @@ -1533,6 +1535,7 @@ reaper(SIGNAL_ARGS) /* * Startup succeeded - remember its ID + * and RedoRecPtr */ SetThisStartUpID(); @@ -1633,7 +1636,10 @@ CleanupProc(int pid, { CheckPointPID = 0; if (!FatalError) + { checkpointed = time(NULL); + GetRedoRecPtr(); + } } else { diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7586491ec4849a6eef61e5e7e39859a6ba49e7e6..2adea99e82ca60db94a78b5542be121e6ad2d2fa 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.99 2000/12/22 20:04:43 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.100 2000/12/28 13:00:21 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -2208,3 +2208,16 @@ MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer)) SpinRelease(BufMgrLock); return; } + +RelFileNode +BufferGetFileNode(Buffer buffer) +{ + BufferDesc *bufHdr; + + if (BufferIsLocal(buffer)) + bufHdr = &(LocalBufferDescriptors[-buffer - 1]); + else + bufHdr = &BufferDescriptors[buffer - 1]; + + return(bufHdr->tag.rnode); +} diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 9f23d4272a55c709b9ebff581fb498b65bcde430..e6b57c648b374b0375d330ecfb82417a9834267e 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/init/globals.c,v 1.47 2000/11/12 20:51:52 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/init/globals.c,v 1.48 2000/12/28 13:00:24 vadim Exp $ * * NOTES * Globals used all over the place should be declared here and not @@ -123,3 +123,49 @@ char *SharedSystemRelationNames[] = { VariableRelationName, 0 }; + +uint32 crc_table[] = { +0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, +0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, +0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, +0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, +0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, +0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, +0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c, +0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, +0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, +0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, +0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106, +0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, +0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, +0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, +0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, +0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, +0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7, +0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, +0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, +0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, +0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81, +0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, +0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84, +0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, +0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, +0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, +0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e, +0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, +0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, +0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, +0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28, +0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, +0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f, +0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, +0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, +0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, +0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69, +0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, +0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, +0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, +0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693, +0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, +0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d +}; diff --git a/src/include/access/htup.h b/src/include/access/htup.h index 56631043e3b532de9c5e67fe41ec313218bc8f72..8bf67486f7ec4df36331329ad8a2b31feb993665 100644 --- a/src/include/access/htup.h +++ b/src/include/access/htup.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: htup.h,v 1.42 2000/12/27 23:59:13 tgl Exp $ + * $Id: htup.h,v 1.43 2000/12/28 13:00:25 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -71,6 +71,13 @@ typedef HeapTupleHeaderData *HeapTupleHeader; #define XLOG_HEAP_DELETE 0x10 #define XLOG_HEAP_UPDATE 0x20 #define XLOG_HEAP_MOVE 0x30 +#define XLOG_HEAP_CLEAN 0x40 +#define XLOG_HEAP_OPMASK 0x70 +/* + * When we insert 1st item on new page in INSERT/UPDATE + * we can (and we do) restore entire page in redo + */ +#define XLOG_HEAP_INIT_PAGE 0x80 /* * All what we need to find changed tuple (18 bytes) @@ -78,13 +85,10 @@ typedef HeapTupleHeaderData *HeapTupleHeader; typedef struct xl_heaptid { RelFileNode node; - CommandId cid; /* this is for "better" tuple' */ - /* identification - it allows to avoid */ - /* "compensation" records for undo */ ItemPointerData tid; /* changed tuple id */ } xl_heaptid; -/* This is what we need to know about delete - ALIGN(18) = 24 bytes */ +/* This is what we need to know about delete */ typedef struct xl_heap_delete { xl_heaptid target; /* deleted tuple id */ @@ -92,35 +96,44 @@ typedef struct xl_heap_delete #define SizeOfHeapDelete (offsetof(xl_heaptid, tid) + SizeOfIptrData) -/* This is what we need to know about insert - 26 + data */ -typedef struct xl_heap_insert +typedef struct xl_heap_header { - xl_heaptid target; /* inserted tuple id */ - /* something from tuple header */ - int16 t_natts; Oid t_oid; + int16 t_natts; uint8 t_hoff; uint8 mask; /* low 8 bits of t_infomask */ - /* TUPLE DATA FOLLOWS AT END OF STRUCT */ +} xl_heap_header; + +#define SizeOfHeapHeader (offsetof(xl_heap_header, mask) + sizeof(uint8)) + +/* This is what we need to know about insert */ +typedef struct xl_heap_insert +{ + xl_heaptid target; /* inserted tuple id */ + /* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_insert; -#define SizeOfHeapInsert (offsetof(xl_heap_insert, mask) + sizeof(uint8)) +#define SizeOfHeapInsert (offsetof(xl_heaptid, tid) + SizeOfIptrData) -/* This is what we need to know about update|move - 32|36 + data */ +/* This is what we need to know about update|move */ typedef struct xl_heap_update { xl_heaptid target; /* deleted tuple id */ ItemPointerData newtid; /* new inserted tuple id */ - /* something from header of new tuple version */ - Oid t_oid; - int16 t_natts; - uint8 t_hoff; - uint8 mask; /* low 8 bits of t_infomask */ - /* NEW TUPLE DATA FOLLOWS AT END OF STRUCT */ - /* (AFTER XMAX FOR MOVE OP) */ + /* NEW TUPLE xl_heap_header (XMIN & XMAX FOR MOVE OP) */ + /* and TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_update; -#define SizeOfHeapUpdate (offsetof(xl_heap_update, mask) + sizeof(uint8)) +#define SizeOfHeapUpdate (offsetof(xl_heap_update, newtid) + SizeOfIptrData) + +/* This is what we need to know about page cleanup */ +typedef struct xl_heap_clean +{ + RelFileNode node; + BlockNumber block; +} xl_heap_clean; + +#define SizeOfHeapClean (offsetof(xl_heap_clean, block) + sizeof(BlockNumber)) /* * MaxTupleSize is the maximum allowed size of a tuple, including header and diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index fa3326d4c39db2cb9eb52ca685cab2ff4326bf46..08d8daaea4d9f26560bf49687cb380f6ceae88e5 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: nbtree.h,v 1.48 2000/11/30 08:46:25 vadim Exp $ + * $Id: nbtree.h,v 1.49 2000/12/28 13:00:25 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -217,8 +217,10 @@ typedef BTStackData *BTStack; /* goes to the left sibling */ #define XLOG_BTREE_NEWROOT 0x40 /* new root page */ +#define XLOG_BTREE_LEAF 0x80 /* leaf/internal page was changed */ + /* - * All what we need to find changed index tuple (14 bytes) + * All what we need to find changed index tuple */ typedef struct xl_btreetid { @@ -227,7 +229,7 @@ typedef struct xl_btreetid } xl_btreetid; /* - * This is what we need to know about delete - ALIGN(14) = 18 bytes. + * This is what we need to know about delete */ typedef struct xl_btree_delete { @@ -237,39 +239,33 @@ typedef struct xl_btree_delete #define SizeOfBtreeDelete (offsetof(xl_btreetid, tid) + SizeOfIptrData) /* - * This is what we need to know about pure (without split) insert - - * 14 + [4+8] + btitem with key data. Note that we need in CommandID - * and HeapNode (4 + 8 bytes) only for leaf page insert. + * This is what we need to know about pure (without split) insert */ typedef struct xl_btree_insert { xl_btreetid target; /* inserted tuple id */ - /* [CommandID, HeapNode and ] BTITEM FOLLOWS AT END OF STRUCT */ + /* BTITEM FOLLOWS AT END OF STRUCT */ } xl_btree_insert; #define SizeOfBtreeInsert (offsetof(xl_btreetid, tid) + SizeOfIptrData) /* - * This is what we need to know about insert with split - - * 22 + {4 + 8 | left hi-key} + [btitem] + right sibling btitems. Note that - * we need in CommandID and HeapNode (4 + 8 bytes) for leaf pages - * and in left page hi-key for non-leaf ones. + * On insert with split we save items of both left and right siblings + * and restore content of both pages from log record */ typedef struct xl_btree_split { xl_btreetid target; /* inserted tuple id */ BlockIdData otherblk; /* second block participated in split: */ /* first one is stored in target' tid */ + BlockIdData parentblk; /* parent block */ + BlockIdData leftblk; /* prev left block */ BlockIdData rightblk; /* next right block */ - /* - * We log all btitems from the right sibling. If new btitem goes on - * the left sibling then we log it too and it will be the first - * BTItemData at the end of this struct after CommandId and HeapNode - * on the leaf pages and left page hi-key on non-leaf ones. - */ + uint16 leftlen; /* len of left page items below */ + /* LEFT AND RIGHT PAGES ITEMS FOLLOW AT THE END */ } xl_btree_split; -#define SizeOfBtreeSplit (offsetof(xl_btree_split, rightblk) + sizeof(BlockIdData)) +#define SizeOfBtreeSplit (offsetof(xl_btree_split, leftlen) + sizeof(uint16)) /* * New root log record. @@ -277,6 +273,7 @@ typedef struct xl_btree_split typedef struct xl_btree_newroot { RelFileNode node; + int32 level; BlockIdData rootblk; /* 0 or 2 BTITEMS FOLLOW AT END OF STRUCT */ } xl_btree_newroot; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 1c10501fb7d5fc151194fda407853cd2937cfe43..88268b0b0b20ac614fb4987b38b3de042163a6e0 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -3,7 +3,7 @@ * * PostgreSQL transaction log manager * - * $Header: /cvsroot/pgsql/src/include/access/xlog.h,v 1.14 2000/12/18 00:44:48 tgl Exp $ + * $Header: /cvsroot/pgsql/src/include/access/xlog.h,v 1.15 2000/12/28 13:00:25 vadim Exp $ */ #ifndef XLOG_H #define XLOG_H @@ -13,12 +13,19 @@ #include "access/xlogdefs.h" #include "access/xlogutils.h" +typedef struct crc64 +{ + uint32 crc1; + uint32 crc2; +} crc64; + typedef struct XLogRecord { + crc64 xl_crc; XLogRecPtr xl_prev; /* ptr to previous record in log */ XLogRecPtr xl_xact_prev; /* ptr to previous record of this xact */ TransactionId xl_xid; /* xact id */ - uint16 xl_len; /* len of record *data* on this page */ + uint16 xl_len; /* total len of record *data* */ uint8 xl_info; RmgrId xl_rmid; /* resource manager inserted this record */ @@ -33,25 +40,30 @@ typedef struct XLogRecord ((char*)record + SizeOfXLogRecord) /* - * When there is no space on current page we continue on the next - * page with subrecord. + * When there is no space on current page we continue + * on the next page with subrecord. */ typedef struct XLogSubRecord { - uint16 xl_len; - uint8 xl_info; + uint16 xl_len; /* len of data left */ /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ } XLogSubRecord; -#define SizeOfXLogSubRecord DOUBLEALIGN(sizeof(XLogSubRecord)) +#define SizeOfXLogSubRecord DOUBLEALIGN(sizeof(XLogSubRecord)) /* - * XLOG uses only low 4 bits of xl_info. High 4 bits may be used - * by rmgr... + * XLOG uses only low 4 bits of xl_info. + * High 4 bits may be used by rmgr... + * + * We support backup of 2 blocks per record only. + * If we backed up some of these blocks then we use + * flags below to signal rmgr about this on recovery. */ -#define XLR_TO_BE_CONTINUED 0x01 +#define XLR_SET_BKP_BLOCK(iblk) (0x08 >> iblk) +#define XLR_BKP_BLOCK_1 XLR_SET_BKP_BLOCK(0) /* 0x08 */ +#define XLR_BKP_BLOCK_2 XLR_SET_BKP_BLOCK(1) /* 0x04 */ #define XLR_INFO_MASK 0x0F /* @@ -72,6 +84,7 @@ typedef struct XLogPageHeaderData typedef XLogPageHeaderData *XLogPageHeader; +/* When record crosses page boundary */ #define XLP_FIRST_IS_SUBRECORD 0x0001 #define XLByteLT(left, right) \ @@ -100,9 +113,22 @@ typedef struct RmgrData extern RmgrData RmgrTable[]; -extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, - char *hdr, uint32 hdrlen, - char *buf, uint32 buflen); +/* + * List of these structs is used to pass data to XLOG. + * If buffer is valid then XLOG will check if buffer must + * be backup-ed. For backup-ed buffer data will not be + * inserted into record (and XLOG sets + * XLR_BKP_BLOCK_X bit in xl_info). + */ +typedef struct XLogRecData +{ + Buffer buffer; /* buffer associated with this data */ + char *data; + uint32 len; + struct XLogRecData *next; +} XLogRecData; + +extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); extern void XLogFlush(XLogRecPtr RecPtr); extern void CreateCheckPoint(bool shutdown); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index aa3802db2c154ff5ae62ec8b0d3eeb7b21c2bbb9..1d4765cc662831924dc71e1710ef0a4e67b12b8a 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: catversion.h,v 1.66 2000/12/03 14:51:09 thomas Exp $ + * $Id: catversion.h,v 1.67 2000/12/28 13:00:27 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 200012030 +#define CATALOG_VERSION_NO 200012280 #endif diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 415364862b23190d4e37b2ddebe76b8376c8e011..547c0ec8b9abb9beccf6a9327d242ed2c1786717 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -46,12 +46,11 @@ typedef FormData_pg_sequence *Form_pg_sequence; /* XLOG stuff */ #define XLOG_SEQ_LOG 0x00 -#define XLOG_SEQ_SET 0x10 typedef struct xl_seq_rec { RelFileNode node; - int4 value; /* last logged value */ + /* SEQUENCE TUPLE DATA FOLLOWS AT THE END */ } xl_seq_rec; extern Datum nextval(PG_FUNCTION_ARGS); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 7f1906441a8ba7595d850afd90c4ca1325cf585b..cc995dd48471ae2f48e6f2d6a9915614586557eb 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: bufmgr.h,v 1.47 2000/12/18 00:44:49 tgl Exp $ + * $Id: bufmgr.h,v 1.48 2000/12/28 13:00:29 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -183,6 +183,7 @@ extern void DropBuffers(Oid dbid); extern void PrintPinnedBufs(void); extern int BufferShmemSize(void); extern int ReleaseBuffer(Buffer buffer); +extern RelFileNode BufferGetFileNode(Buffer buffer); extern void SetBufferCommitInfoNeedsSave(Buffer buffer);