diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index 37d04cfb7e99433ee8b5f708c50b05842e7415bb..b7b89e4e49d943fd7387bfc07618a0442a391d32 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -121,22 +121,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* first, forget any incomplete split this insertion completes */ + if (data->isData) + { + Assert(data->isDelete == FALSE); + if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + { + PostingItem *pitem; + + pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + forgetIncompleteSplit(data->node, + PostingItemGetBlockNumber(pitem), + data->updateBlkno); + } + } + else + { + if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + { + IndexTuple itup; + + itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + forgetIncompleteSplit(data->node, + GinItemPointerGetBlockNumber(&itup->t_tid), + data->updateBlkno); + } + } + /* nothing else to do if page was backed up */ if (record->xl_info & XLR_BKP_BLOCK_1) return; reln = XLogOpenRelation(data->node); buffer = XLogReadBuffer(reln, data->blkno, false); - Assert(BufferIsValid(buffer)); + if (!BufferIsValid(buffer)) + return; page = (Page) BufferGetPage(buffer); - if (data->isData) + if (!XLByteLE(lsn, PageGetLSN(page))) { - Assert(data->isDelete == FALSE); - Assert(GinPageIsData(page)); - - if (!XLByteLE(lsn, PageGetLSN(page))) + if (data->isData) { + Assert(GinPageIsData(page)); + if (data->isLeaf) { OffsetNumber i; @@ -166,23 +193,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) GinDataPageAddItem(page, pitem, data->offset); } } - - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + else { - PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - - forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno); - } + IndexTuple itup; - } - else - { - IndexTuple itup; + Assert(!GinPageIsData(page)); - Assert(!GinPageIsData(page)); - - if (!XLByteLE(lsn, PageGetLSN(page))) - { if (data->updateBlkno != InvalidBlockNumber) { /* update link to right page after split */ @@ -203,23 +219,15 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber) elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); + data->node.spcNode, data->node.dbNode, data->node.relNode); } - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) - { - itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno); - } - } - - if (!XLByteLE(lsn, PageGetLSN(page))) - { PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); } + UnlockReleaseBuffer(buffer); } @@ -241,7 +249,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) if (data->isData) flags |= GIN_DATA; - lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit); + lbuffer = XLogReadBuffer(reln, data->lblkno, true); Assert(BufferIsValid(lbuffer)); lpage = (Page) BufferGetPage(lbuffer); GinInitBuffer(lbuffer, flags); @@ -318,7 +326,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) if (data->isRootSplit) { - Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, false); + Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, true); Page rootPage = BufferGetPage(rootBuf); GinInitBuffer(rootBuf, flags & ~GIN_LEAF); @@ -355,46 +363,51 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; - /* nothing else to do if page was backed up (and no info to do it with) */ + /* nothing to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; reln = XLogOpenRelation(data->node); buffer = XLogReadBuffer(reln, data->blkno, false); - Assert(BufferIsValid(buffer)); + if (!BufferIsValid(buffer)) + return; page = (Page) BufferGetPage(buffer); - if (GinPageIsData(page)) - { - memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage), - GinSizeOfItem(page) *data->nitem); - GinPageGetOpaque(page)->maxoff = data->nitem; - } - else + if (!XLByteLE(lsn, PageGetLSN(page))) { - OffsetNumber i, - *tod; - IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage)); + if (GinPageIsData(page)) + { + memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage), + GinSizeOfItem(page) *data->nitem); + GinPageGetOpaque(page)->maxoff = data->nitem; + } + else + { + OffsetNumber i, + *tod; + IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage)); - tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page)); - for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++) - tod[i - 1] = i; + tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page)); + for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++) + tod[i - 1] = i; - PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page)); + PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page)); - for (i = 0; i < data->nitem; i++) - { - if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); - itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); + for (i = 0; i < data->nitem; i++) + { + if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add item to index page in %u/%u/%u", + data->node.spcNode, data->node.dbNode, data->node.relNode); + itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); + } } - } - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + + MarkBufferDirty(buffer); + } - MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } @@ -411,38 +424,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) if (!(record->xl_info & XLR_BKP_BLOCK_1)) { buffer = XLogReadBuffer(reln, data->blkno, false); - page = BufferGetPage(buffer); - Assert(GinPageIsData(page)); - GinPageGetOpaque(page)->flags = GIN_DELETED; - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + { + page = BufferGetPage(buffer); + if (!XLByteLE(lsn, PageGetLSN(page))) + { + Assert(GinPageIsData(page)); + GinPageGetOpaque(page)->flags = GIN_DELETED; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); + } } if (!(record->xl_info & XLR_BKP_BLOCK_2)) { buffer = XLogReadBuffer(reln, data->parentBlkno, false); - page = BufferGetPage(buffer); - Assert(GinPageIsData(page)); - Assert(!GinPageIsLeaf(page)); - PageDeletePostingItem(page, data->parentOffset); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + { + page = BufferGetPage(buffer); + if (!XLByteLE(lsn, PageGetLSN(page))) + { + Assert(GinPageIsData(page)); + Assert(!GinPageIsLeaf(page)); + PageDeletePostingItem(page, data->parentOffset); + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); + } } if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber) { buffer = XLogReadBuffer(reln, data->leftBlkno, false); - page = BufferGetPage(buffer); - Assert(GinPageIsData(page)); - GinPageGetOpaque(page)->rightlink = data->rightLink; - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + { + page = BufferGetPage(buffer); + if (!XLByteLE(lsn, PageGetLSN(page))) + { + Assert(GinPageIsData(page)); + GinPageGetOpaque(page)->rightlink = data->rightLink; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); + } } } @@ -560,6 +591,13 @@ ginContinueSplit(ginIncompleteSplit *split) buffer = XLogReadBuffer(reln, split->leftBlkno, false); + /* + * Failure should be impossible here, because we wrote the page earlier. + */ + if (!BufferIsValid(buffer)) + elog(PANIC, "ginContinueSplit: left block %u not found", + split->leftBlkno); + if (split->rootBlkno == GIN_ROOT_BLKNO) { prepareEntryScan(&btree, reln, (Datum) 0, NULL);