未验证 提交 47369d3c 编写于 作者: X xiong-gang 提交者: GitHub

Fix bitmap index recovery issue

Bitmap index doesn't write the backup page in xlog, so if system crashes
after checkpoint and the index is dropped, bitmap_redo will find the the
page is invalid and won't be able to recover.

For example:
create table bm_test (a int4, b int4);
create index bm_b_idx on bm_test using bitmap(b);
checkpoint;
insert into bm_test select i,i from generate_series(1,100000)i;
drop table bm_test;

pkill -9 postgres
gpstart -a
上级 5b508229
......@@ -865,7 +865,7 @@ _bitmap_log_bitmap_lastwords(Relation rel, Buffer lovBuffer,
{
xl_bm_bitmap_lastwords xlLastwords;
XLogRecPtr recptr;
XLogRecData rdata[1];
XLogRecData rdata[2];
xlLastwords.bm_node = rel->rd_node;
xlLastwords.bm_last_compword = lovItem->bm_last_compword;
......@@ -879,7 +879,13 @@ _bitmap_log_bitmap_lastwords(Relation rel, Buffer lovBuffer,
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char*)&xlLastwords;
rdata[0].len = sizeof(xl_bm_bitmap_lastwords);
rdata[0].next = NULL;
rdata[0].next = &(rdata[1]);
rdata[1].buffer = lovBuffer;
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
recptr = XLogInsert(RM_BITMAP_ID, XLOG_BITMAP_INSERT_BITMAP_LASTWORDS,
rdata);
......@@ -898,7 +904,7 @@ _bitmap_log_lovitem(Relation rel, ForkNumber fork, Buffer lovBuffer, OffsetNumbe
xl_bm_lovitem xlLovItem;
XLogRecPtr recptr;
XLogRecData rdata[1];
XLogRecData rdata[3];
Assert(BufferGetBlockNumber(lovBuffer) > 0);
......@@ -912,7 +918,27 @@ _bitmap_log_lovitem(Relation rel, ForkNumber fork, Buffer lovBuffer, OffsetNumbe
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char*)&xlLovItem;
rdata[0].len = sizeof(xl_bm_lovitem);
rdata[0].next = NULL;
rdata[0].next = &(rdata[1]);
rdata[1].buffer = lovBuffer;
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer_std = true;
if (!is_new_lov_blkno)
{
rdata[1].next = NULL;
}
else
{
rdata[1].next = &(rdata[2]);
rdata[2].buffer = metabuf;
rdata[2].data = NULL;
rdata[2].len = 0;
rdata[2].buffer_std = false;
rdata[2].next = NULL;
}
recptr = XLogInsert(RM_BITMAP_ID,
XLOG_BITMAP_INSERT_LOVITEM, rdata);
......@@ -943,7 +969,7 @@ _bitmap_log_bitmapwords(Relation rel, Buffer bitmapBuffer, Buffer lovBuffer,
BMBitmapOpaque bitmapPageOpaque;
xl_bm_bitmapwords *xlBitmapWords;
XLogRecPtr recptr;
XLogRecData rdata[1];
XLogRecData rdata[2];
uint64* lastTids;
BM_HRL_WORD* cwords;
BM_HRL_WORD* hwords;
......@@ -999,7 +1025,13 @@ _bitmap_log_bitmapwords(Relation rel, Buffer bitmapBuffer, Buffer lovBuffer,
rdata[0].data = (char*)xlBitmapWords;
rdata[0].len = MAXALIGN(sizeof(xl_bm_bitmapwords)) + MAXALIGN(lastTids_size) +
MAXALIGN(cwords_size) + MAXALIGN(hwords_size);
rdata[0].next = NULL;
rdata[0].next = &(rdata[1]);
rdata[1].buffer = lovBuffer;
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
recptr = XLogInsert(RM_BITMAP_ID, XLOG_BITMAP_INSERT_WORDS, rdata);
......@@ -1021,7 +1053,7 @@ _bitmap_log_updateword(Relation rel, Buffer bitmapBuffer, int word_no)
BMBitmap bitmap;
xl_bm_updateword xlBitmapWord;
XLogRecPtr recptr;
XLogRecData rdata[1];
XLogRecData rdata[2];
bitmapPage = BufferGetPage(bitmapBuffer);
bitmap = (BMBitmap) PageGetContentsMaxAligned(bitmapPage);
......@@ -1040,7 +1072,13 @@ _bitmap_log_updateword(Relation rel, Buffer bitmapBuffer, int word_no)
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char*)&xlBitmapWord;
rdata[0].len = sizeof(xl_bm_updateword);
rdata[0].next = NULL;
rdata[0].next = &(rdata[1]);
rdata[1].buffer = bitmapBuffer;
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
recptr = XLogInsert(RM_BITMAP_ID, XLOG_BITMAP_UPDATEWORD, rdata);
......@@ -1070,7 +1108,7 @@ _bitmap_log_updatewords(Relation rel,
xl_bm_updatewords xlBitmapWords;
XLogRecPtr recptr;
XLogRecData rdata[1];
XLogRecData rdata[4];
firstPage = BufferGetPage(firstBuffer);
......@@ -1118,7 +1156,47 @@ _bitmap_log_updatewords(Relation rel,
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char*)&xlBitmapWords;
rdata[0].len = sizeof(xl_bm_updatewords);
rdata[0].next = NULL;
rdata[0].next = &(rdata[1]);
rdata[1].buffer = firstBuffer;
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer_std = true;
if (!BufferIsValid(secondBuffer))
{
rdata[1].next = NULL;
}
else
{
rdata[1].next = &(rdata[2]);
rdata[2].buffer = secondBuffer;
rdata[2].data = NULL;
rdata[2].len = 0;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
}
if (new_lastpage)
{
if (!BufferIsValid(secondBuffer))
{
rdata[1].next = &(rdata[2]);
rdata[2].buffer = lovBuffer;
rdata[2].data = NULL;
rdata[2].len = 0;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
}
else
{
rdata[2].next = &(rdata[3]);
rdata[3].buffer = lovBuffer;
rdata[3].data = NULL;
rdata[3].len = 0;
rdata[3].buffer_std = true;
rdata[3].next = NULL;
}
}
recptr = XLogInsert(RM_BITMAP_ID, XLOG_BITMAP_UPDATEWORDS, rdata);
......
......@@ -139,75 +139,90 @@ static void
_bitmap_xlog_insert_lovitem(XLogRecPtr lsn, XLogRecord *record)
{
xl_bm_lovitem *xlrec = (xl_bm_lovitem *) XLogRecGetData(record);
Buffer lovBuffer;
Page lovPage;
if (xlrec->bm_is_new_lov_blkno)
if (record->xl_info & XLR_BKP_BLOCK(0))
{
lovBuffer = XLogReadBufferExtended(xlrec->bm_node, xlrec->bm_fork,
xlrec->bm_lov_blkno, RBM_ZERO);
Assert(BufferIsValid(lovBuffer));
LockBuffer(lovBuffer, BUFFER_LOCK_EXCLUSIVE);
(void) RestoreBackupBlock(lsn, record, 0, false, false);
}
else
{
lovBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_lov_blkno, false);
if (!BufferIsValid(lovBuffer))
return;
}
Buffer lovBuffer;
Page lovPage;
if (xlrec->bm_is_new_lov_blkno)
{
lovBuffer = XLogReadBufferExtended(xlrec->bm_node, xlrec->bm_fork,
xlrec->bm_lov_blkno, RBM_ZERO);
Assert(BufferIsValid(lovBuffer));
LockBuffer(lovBuffer, BUFFER_LOCK_EXCLUSIVE);
}
else
{
lovBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_lov_blkno, false);
if (!BufferIsValid(lovBuffer))
return;
}
lovPage = BufferGetPage(lovBuffer);
lovPage = BufferGetPage(lovBuffer);
if (PageIsNew(lovPage))
_bitmap_init_lovpage(NULL, lovBuffer);
if (PageIsNew(lovPage))
_bitmap_init_lovpage(NULL, lovBuffer);
elog(DEBUG1, "In redo, processing a lovItem: (blockno, offset)=(%d,%d)",
xlrec->bm_lov_blkno, xlrec->bm_lov_offset);
elog(DEBUG1, "In redo, processing a lovItem: (blockno, offset)=(%d,%d)",
xlrec->bm_lov_blkno, xlrec->bm_lov_offset);
if (PageGetLSN(lovPage) < lsn)
{
OffsetNumber newOffset, itemSize;
if (PageGetLSN(lovPage) < lsn)
{
OffsetNumber newOffset, itemSize;
newOffset = OffsetNumberNext(PageGetMaxOffsetNumber(lovPage));
if (newOffset > xlrec->bm_lov_offset)
elog(PANIC, "_bitmap_xlog_insert_lovitem: LOV item is not inserted "
newOffset = OffsetNumberNext(PageGetMaxOffsetNumber(lovPage));
if (newOffset > xlrec->bm_lov_offset)
elog(PANIC, "_bitmap_xlog_insert_lovitem: LOV item is not inserted "
"in pos %d (requested %d)",
newOffset, xlrec->bm_lov_offset);
newOffset, xlrec->bm_lov_offset);
/*
* The value newOffset could be smaller than xlrec->bm_lov_offset because
* of aborted transactions.
*/
if (newOffset < xlrec->bm_lov_offset)
{
_bitmap_relbuf(lovBuffer);
return;
}
/*
* The value newOffset could be smaller than xlrec->bm_lov_offset because
* of aborted transactions.
*/
if (newOffset < xlrec->bm_lov_offset)
{
_bitmap_relbuf(lovBuffer);
return;
}
itemSize = sizeof(BMLOVItemData);
if (itemSize > PageGetFreeSpace(lovPage))
elog(PANIC,
"_bitmap_xlog_insert_lovitem: not enough space in LOV page %d",
xlrec->bm_lov_blkno);
if (PageAddItem(lovPage, (Item)&(xlrec->bm_lovItem), itemSize,
itemSize = sizeof(BMLOVItemData);
if (itemSize > PageGetFreeSpace(lovPage))
elog(PANIC,
"_bitmap_xlog_insert_lovitem: not enough space in LOV page %d",
xlrec->bm_lov_blkno);
if (PageAddItem(lovPage, (Item)&(xlrec->bm_lovItem), itemSize,
newOffset, false, false) == InvalidOffsetNumber)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("_bitmap_xlog_insert_lovitem: failed to add LOV item")));
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("_bitmap_xlog_insert_lovitem: failed to add LOV item")));
PageSetLSN(lovPage, lsn);
PageSetLSN(lovPage, lsn);
_bitmap_wrtbuf(lovBuffer);
_bitmap_wrtbuf(lovBuffer);
}
else
_bitmap_relbuf(lovBuffer);
}
/* Update the meta page when needed */
if (!xlrec->bm_is_new_lov_blkno)
return;
if (record->xl_info & XLR_BKP_BLOCK(1))
{
(void) RestoreBackupBlock(lsn, record, 1, false, false);
}
else
_bitmap_relbuf(lovBuffer);
/* Update the meta page when needed */
if (xlrec->bm_is_new_lov_blkno)
{
Buffer metabuf;
BMMetaPage metapage;
{
BMMetaPage metapage;
Buffer metabuf;
metabuf = XLogReadBufferExtended(xlrec->bm_node, xlrec->bm_fork,
BM_METAPAGE, RBM_ZERO);
......@@ -215,21 +230,21 @@ _bitmap_xlog_insert_lovitem(XLogRecPtr lsn, XLogRecord *record)
return;
LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
metapage = (BMMetaPage) PageGetContents(BufferGetPage(metabuf));
if (PageGetLSN(BufferGetPage(metabuf)) < lsn)
{
metapage->bm_lov_lastpage = xlrec->bm_lov_blkno;
PageSetLSN(BufferGetPage(metabuf), lsn);
_bitmap_wrtbuf(metabuf);
}
else
{
_bitmap_relbuf(metabuf);
}
}
metapage = (BMMetaPage) PageGetContents(BufferGetPage(metabuf));
if (PageGetLSN(BufferGetPage(metabuf)) < lsn)
{
metapage->bm_lov_lastpage = xlrec->bm_lov_blkno;
PageSetLSN(BufferGetPage(metabuf), lsn);
_bitmap_wrtbuf(metabuf);
}
else
{
_bitmap_relbuf(metabuf);
}
}
}
/*
......@@ -282,6 +297,13 @@ _bitmap_xlog_insert_bitmap_lastwords(XLogRecPtr lsn,
Page lovPage;
BMLOVItem lovItem;
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
}
xlrec = (xl_bm_bitmap_lastwords *) XLogRecGetData(record);
lovBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_lov_blkno, false);
......@@ -430,6 +452,12 @@ _bitmap_xlog_insert_bitmapwords(XLogRecPtr lsn, XLogRecord *record)
}
/* Update lovPage when needed */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
}
lovBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_lov_blkno, false);
if (!BufferIsValid(lovBuffer))
return;
......@@ -456,7 +484,6 @@ _bitmap_xlog_insert_bitmapwords(XLogRecPtr lsn, XLogRecord *record)
_bitmap_wrtbuf(lovBuffer);
}
else if (xlrec->bm_is_first && PageGetLSN(lovPage) < lsn)
{
lovItem->bm_lov_head = xlrec->bm_blkno;
......@@ -466,7 +493,6 @@ _bitmap_xlog_insert_bitmapwords(XLogRecPtr lsn, XLogRecord *record)
_bitmap_wrtbuf(lovBuffer);
}
else
{
_bitmap_relbuf(lovBuffer);
......@@ -490,6 +516,12 @@ _bitmap_xlog_updateword(XLogRecPtr lsn, XLogRecord *record)
xlrec->bm_word_no, xlrec->bm_cword,
xlrec->bm_hword);
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
}
bitmapBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_blkno, false);
if (BufferIsValid(bitmapBuffer))
{
......@@ -517,115 +549,138 @@ static void
_bitmap_xlog_updatewords(XLogRecPtr lsn, XLogRecord *record)
{
xl_bm_updatewords *xlrec;
Buffer firstBuffer;
Buffer secondBuffer = InvalidBuffer;
Page firstPage;
Page secondPage = NULL;
BMBitmapOpaque firstOpaque;
BMBitmapOpaque secondOpaque = NULL;
BMBitmap firstBitmap;
BMBitmap secondBitmap = NULL;
xlrec = (xl_bm_updatewords *) XLogRecGetData(record);
elog(DEBUG1, "_bitmap_xlog_updatewords: (first_blkno, num_cwords, last_tid, next_blkno)="
"(%d, " INT64_FORMAT ", " INT64_FORMAT ", %d), (second_blkno, num_cwords, last_tid, next_blkno)="
"(%d, " INT64_FORMAT ", " INT64_FORMAT ", %d)",
xlrec->bm_first_blkno, xlrec->bm_first_num_cwords, xlrec->bm_first_last_tid,
xlrec->bm_two_pages ? xlrec->bm_second_blkno : xlrec->bm_next_blkno,
xlrec->bm_second_blkno, xlrec->bm_second_num_cwords,
xlrec->bm_second_last_tid, xlrec->bm_next_blkno);
firstBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_first_blkno, false);
if (BufferIsValid(firstBuffer))
"(%d, " INT64_FORMAT ", " INT64_FORMAT ", %d), (second_blkno, num_cwords, last_tid, next_blkno)="
"(%d, " INT64_FORMAT ", " INT64_FORMAT ", %d)",
xlrec->bm_first_blkno, xlrec->bm_first_num_cwords, xlrec->bm_first_last_tid,
xlrec->bm_two_pages ? xlrec->bm_second_blkno : xlrec->bm_next_blkno,
xlrec->bm_second_blkno, xlrec->bm_second_num_cwords,
xlrec->bm_second_last_tid, xlrec->bm_next_blkno);
if (record->xl_info & XLR_BKP_BLOCK(0))
{
firstPage = BufferGetPage(firstBuffer);
firstOpaque =
(BMBitmapOpaque)PageGetSpecialPointer(firstPage);
firstBitmap = (BMBitmap) PageGetContentsMaxAligned(firstPage);
(void) RestoreBackupBlock(lsn, record, 0, false, false);
}
else
{
Buffer firstBuffer;
Page firstPage;
BMBitmapOpaque firstOpaque;
BMBitmap firstBitmap;
if (PageGetLSN(firstPage) < lsn)
firstBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_first_blkno, false);
if (BufferIsValid(firstBuffer))
{
memcpy(firstBitmap->cwords, xlrec->bm_first_cwords,
BM_NUM_OF_HRL_WORDS_PER_PAGE * sizeof(BM_HRL_WORD));
memcpy(firstBitmap->hwords, xlrec->bm_first_hwords,
BM_NUM_OF_HEADER_WORDS * sizeof(BM_HRL_WORD));
firstOpaque->bm_hrl_words_used = xlrec->bm_first_num_cwords;
firstOpaque->bm_last_tid_location = xlrec->bm_first_last_tid;
if (xlrec->bm_two_pages)
firstOpaque->bm_bitmap_next = xlrec->bm_second_blkno;
firstPage = BufferGetPage(firstBuffer);
firstOpaque =
(BMBitmapOpaque)PageGetSpecialPointer(firstPage);
firstBitmap = (BMBitmap) PageGetContentsMaxAligned(firstPage);
if (PageGetLSN(firstPage) < lsn)
{
memcpy(firstBitmap->cwords, xlrec->bm_first_cwords,
BM_NUM_OF_HRL_WORDS_PER_PAGE * sizeof(BM_HRL_WORD));
memcpy(firstBitmap->hwords, xlrec->bm_first_hwords,
BM_NUM_OF_HEADER_WORDS * sizeof(BM_HRL_WORD));
firstOpaque->bm_hrl_words_used = xlrec->bm_first_num_cwords;
firstOpaque->bm_last_tid_location = xlrec->bm_first_last_tid;
if (xlrec->bm_two_pages)
firstOpaque->bm_bitmap_next = xlrec->bm_second_blkno;
else
firstOpaque->bm_bitmap_next = xlrec->bm_next_blkno;
PageSetLSN(firstPage, lsn);
_bitmap_wrtbuf(firstBuffer);
}
else
firstOpaque->bm_bitmap_next = xlrec->bm_next_blkno;
_bitmap_relbuf(firstBuffer);
}
}
PageSetLSN(firstPage, lsn);
_bitmap_wrtbuf(firstBuffer);
/* Update secondPage when needed */
if (xlrec->bm_two_pages)
{
if (record->xl_info & XLR_BKP_BLOCK(1))
{
(void) RestoreBackupBlock(lsn, record, 1, false, false);
}
else
_bitmap_relbuf(firstBuffer);
{
Buffer secondBuffer = InvalidBuffer;
Page secondPage = NULL;
BMBitmapOpaque secondOpaque = NULL;
BMBitmap secondBitmap = NULL;
secondBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_second_blkno, true);
secondPage = BufferGetPage(secondBuffer);
if (PageIsNew(secondPage))
_bitmap_init_bitmappage(NULL, secondBuffer);
secondOpaque = (BMBitmapOpaque)PageGetSpecialPointer(secondPage);
secondBitmap = (BMBitmap) PageGetContentsMaxAligned(secondPage);
if (PageGetLSN(secondPage) < lsn)
{
memcpy(secondBitmap->cwords, xlrec->bm_second_cwords,
BM_NUM_OF_HRL_WORDS_PER_PAGE * sizeof(BM_HRL_WORD));
memcpy(secondBitmap->hwords, xlrec->bm_second_hwords,
BM_NUM_OF_HEADER_WORDS * sizeof(BM_HRL_WORD));
secondOpaque->bm_hrl_words_used = xlrec->bm_second_num_cwords;
secondOpaque->bm_last_tid_location = xlrec->bm_second_last_tid;
secondOpaque->bm_bitmap_next = xlrec->bm_next_blkno;
PageSetLSN(secondPage, lsn);
_bitmap_wrtbuf(secondBuffer);
}
else
{
_bitmap_relbuf(secondBuffer);
}
}
}
/* Update secondPage when needed */
if (xlrec->bm_two_pages)
{
secondBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_second_blkno, true);
secondPage = BufferGetPage(secondBuffer);
if (PageIsNew(secondPage))
_bitmap_init_bitmappage(NULL, secondBuffer);
secondOpaque = (BMBitmapOpaque)PageGetSpecialPointer(secondPage);
secondBitmap = (BMBitmap) PageGetContentsMaxAligned(secondPage);
if (PageGetLSN(secondPage) < lsn)
{
memcpy(secondBitmap->cwords, xlrec->bm_second_cwords,
BM_NUM_OF_HRL_WORDS_PER_PAGE * sizeof(BM_HRL_WORD));
memcpy(secondBitmap->hwords, xlrec->bm_second_hwords,
BM_NUM_OF_HEADER_WORDS * sizeof(BM_HRL_WORD));
secondOpaque->bm_hrl_words_used = xlrec->bm_second_num_cwords;
secondOpaque->bm_last_tid_location = xlrec->bm_second_last_tid;
secondOpaque->bm_bitmap_next = xlrec->bm_next_blkno;
PageSetLSN(secondPage, lsn);
_bitmap_wrtbuf(secondBuffer);
}
else
{
_bitmap_relbuf(secondBuffer);
}
}
/* Update lovPage when needed */
if (xlrec->bm_new_lastpage)
{
Buffer lovBuffer;
Page lovPage;
BMLOVItem lovItem;
lovBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_lov_blkno, false);
if (!BufferIsValid(lovBuffer))
return;
lovPage = BufferGetPage(lovBuffer);
if (PageGetLSN(lovPage) < lsn)
{
lovItem = (BMLOVItem)
PageGetItem(lovPage,
PageGetItemId(lovPage, xlrec->bm_lov_offset));
lovItem->bm_lov_tail = xlrec->bm_second_blkno;
PageSetLSN(lovPage, lsn);
_bitmap_wrtbuf(lovBuffer);
}
else
{
_bitmap_relbuf(lovBuffer);
}
}
/* Update lovPage when needed */
if (xlrec->bm_new_lastpage)
{
Buffer lovBuffer;
Page lovPage;
BMLOVItem lovItem;
int bkpNo = xlrec->bm_two_pages ? 2 : 1;
if (record->xl_info & XLR_BKP_BLOCK(bkpNo))
{
(void) RestoreBackupBlock(lsn, record, bkpNo, false, false);
}
else
{
lovBuffer = XLogReadBuffer(xlrec->bm_node, xlrec->bm_lov_blkno, false);
if (!BufferIsValid(lovBuffer))
return;
lovPage = BufferGetPage(lovBuffer);
if (PageGetLSN(lovPage) < lsn)
{
lovItem = (BMLOVItem)
PageGetItem(lovPage,
PageGetItemId(lovPage, xlrec->bm_lov_offset));
lovItem->bm_lov_tail = xlrec->bm_second_blkno;
PageSetLSN(lovPage, lsn);
_bitmap_wrtbuf(lovBuffer);
}
else
{
_bitmap_relbuf(lovBuffer);
}
}
}
}
void
......
......@@ -577,3 +577,83 @@ select * from unlogged_test where c1 = 100;
(1 row)
drop table unlogged_test;
--
-- Test crash recovery
--
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
CREATE TABLE bm_test_insert(a int) DISTRIBUTED BY (a);
CREATE INDEX bm_a_idx ON bm_test_insert USING bitmap(a);
CREATE TABLE bm_test_update(a int, b int) DISTRIBUTED BY (a);
CREATE INDEX bm_b_idx ON bm_test_update USING bitmap(b);
INSERT INTO bm_test_update SELECT i,i FROM generate_series (1, 10000) i;
-- flush the data to disk
CHECKPOINT;
-- skip all further checkpoint
SELECT gp_inject_fault_infinite('checkpoint', 'skip', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg0 172.17.0.10:25435 pid=355751)
NOTICE: Success: (seg1 172.17.0.10:25433 pid=355752)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=355753)
gp_inject_fault_infinite
--------------------------
t
t
t
(3 rows)
INSERT INTO bm_test_insert SELECT generate_series (1, 10000);
UPDATE bm_test_update SET b=b+1;
-- trigger recovery on primaries
SELECT gp_inject_fault_infinite('finish_prepared_after_record_commit_prepared', 'panic', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg0 172.17.0.10:25435 pid=355751)
NOTICE: Success: (seg1 172.17.0.10:25433 pid=355752)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=355753)
gp_inject_fault_infinite
--------------------------
t
t
t
(3 rows)
SET client_min_messages='ERROR';
CREATE TABLE trigger_recovery_on_primaries(c int);
RESET client_min_messages;
-- reconnect to the database after restart
\c
SELECT gp_inject_fault('checkpoint', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg1 172.17.0.10:25433 pid=355829)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=355830)
NOTICE: Success: (seg0 172.17.0.10:25435 pid=355828)
gp_inject_fault
-----------------
t
t
t
(3 rows)
SELECT gp_inject_fault('finish_prepared_after_record_commit_prepared', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg1 172.17.0.10:25433 pid=355829)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=355830)
NOTICE: Success: (seg0 172.17.0.10:25435 pid=355828)
gp_inject_fault
-----------------
t
t
t
(3 rows)
SET enable_seqscan=off;
SET enable_indexscan=off;
SELECT * FROM bm_test_insert WHERE a=1;
a
---
1
(1 row)
SELECT * FROM bm_test_update WHERE b=1;
a | b
---+---
(0 rows)
DROP TABLE trigger_recovery_on_primaries;
DROP TABLE bm_test_insert;
DROP TABLE bm_test_update;
......@@ -579,3 +579,83 @@ select * from unlogged_test where c1 = 100;
(1 row)
drop table unlogged_test;
--
-- Test crash recovery
--
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
CREATE TABLE bm_test_insert(a int) DISTRIBUTED BY (a);
CREATE INDEX bm_a_idx ON bm_test_insert USING bitmap(a);
CREATE TABLE bm_test_update(a int, b int) DISTRIBUTED BY (a);
CREATE INDEX bm_b_idx ON bm_test_update USING bitmap(b);
INSERT INTO bm_test_update SELECT i,i FROM generate_series (1, 10000) i;
-- flush the data to disk
CHECKPOINT;
-- skip all further checkpoint
SELECT gp_inject_fault_infinite('checkpoint', 'skip', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg0 172.17.0.10:25435 pid=69866)
NOTICE: Success: (seg1 172.17.0.10:25433 pid=69867)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=69868)
gp_inject_fault_infinite
--------------------------
t
t
t
(3 rows)
INSERT INTO bm_test_insert SELECT generate_series (1, 10000);
UPDATE bm_test_update SET b=b+1;
-- trigger recovery on primaries
SELECT gp_inject_fault_infinite('finish_prepared_after_record_commit_prepared', 'panic', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg0 172.17.0.10:25435 pid=69866)
NOTICE: Success: (seg1 172.17.0.10:25433 pid=69867)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=69868)
gp_inject_fault_infinite
--------------------------
t
t
t
(3 rows)
SET client_min_messages='ERROR';
CREATE TABLE trigger_recovery_on_primaries(c int);
RESET client_min_messages;
-- reconnect to the database after restart
\c
SELECT gp_inject_fault('checkpoint', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg1 172.17.0.10:25433 pid=69945)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=69946)
NOTICE: Success: (seg0 172.17.0.10:25435 pid=69944)
gp_inject_fault
-----------------
t
t
t
(3 rows)
SELECT gp_inject_fault('finish_prepared_after_record_commit_prepared', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
NOTICE: Success: (seg1 172.17.0.10:25433 pid=69945)
NOTICE: Success: (seg2 172.17.0.10:25434 pid=69946)
NOTICE: Success: (seg0 172.17.0.10:25435 pid=69944)
gp_inject_fault
-----------------
t
t
t
(3 rows)
SET enable_seqscan=off;
SET enable_indexscan=off;
SELECT * FROM bm_test_insert WHERE a=1;
a
---
1
(1 row)
SELECT * FROM bm_test_update WHERE b=1;
a | b
---+---
(0 rows)
DROP TABLE trigger_recovery_on_primaries;
DROP TABLE bm_test_insert;
DROP TABLE bm_test_update;
......@@ -27,7 +27,9 @@ test: python_processed64bit
test: leastsquares opr_sanity_gp decode_expr bitmapscan bitmapscan_ao case_gp limit_gp notin percentile join_gp union_gp gpcopy gp_create_table gp_create_view window_views
test: filter gpctas gpdist matrix toast sublink table_functions olap_setup complex opclass_ddl information_schema guc_env_var guc_gp gp_explain distributed_transactions explain_format
test: bitmap_index gp_dump_query_oids analyze gp_owner_permission incremental_analyze
# bitmap_index triggers recovery, run it seperately
test: bitmap_index
test: gp_dump_query_oids analyze gp_owner_permission incremental_analyze
test: indexjoin as_alias regex_gp gpparams with_clause transient_types gp_rules
# dispatch should always run seperately from other cases.
test: dispatch
......
......@@ -250,3 +250,35 @@ analyze unlogged_test;
explain select * from unlogged_test where c1 = 100;
select * from unlogged_test where c1 = 100;
drop table unlogged_test;
--
-- Test crash recovery
--
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
CREATE TABLE bm_test_insert(a int) DISTRIBUTED BY (a);
CREATE INDEX bm_a_idx ON bm_test_insert USING bitmap(a);
CREATE TABLE bm_test_update(a int, b int) DISTRIBUTED BY (a);
CREATE INDEX bm_b_idx ON bm_test_update USING bitmap(b);
INSERT INTO bm_test_update SELECT i,i FROM generate_series (1, 10000) i;
-- flush the data to disk
CHECKPOINT;
-- skip all further checkpoint
SELECT gp_inject_fault_infinite('checkpoint', 'skip', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
INSERT INTO bm_test_insert SELECT generate_series (1, 10000);
UPDATE bm_test_update SET b=b+1;
-- trigger recovery on primaries
SELECT gp_inject_fault_infinite('finish_prepared_after_record_commit_prepared', 'panic', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
SET client_min_messages='ERROR';
CREATE TABLE trigger_recovery_on_primaries(c int);
RESET client_min_messages;
-- reconnect to the database after restart
\c
SELECT gp_inject_fault('checkpoint', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
SELECT gp_inject_fault('finish_prepared_after_record_commit_prepared', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content > -1;
SET enable_seqscan=off;
SET enable_indexscan=off;
SELECT * FROM bm_test_insert WHERE a=1;
SELECT * FROM bm_test_update WHERE b=1;
DROP TABLE trigger_recovery_on_primaries;
DROP TABLE bm_test_insert;
DROP TABLE bm_test_update;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册