提交 aa459523 编写于 作者: A Ashwin Agrawal

Add WAL consistency checking facility.

This is majorly backport of Postgres commit.
------------------------------------------------
commit a507b869
Author: Robert Haas <rhaas@postgresql.org>
Date:   Wed Feb 8 15:45:30 2017 -0500

    Add WAL consistency checking facility.

    When the new GUC wal_consistency_checking is set to a non-empty value,
    it triggers recording of additional full-page images, which are
    compared on the standby against the results of applying the WAL record
    (without regard to those full-page images).  Allowable differences
    such as hints are masked out, and the resulting pages are compared;
    any difference results in a FATAL error on the standby.

    Kuntal Ghosh, based on earlier patches by Michael Paquier and Heikki
    Linnakangas.  Extensively reviewed and revised by Michael Paquier and
    by me, with additional reviews and comments from Amit Kapila, Álvaro
    Herrera, Simon Riggs, and Peter Eisentraut.
------------------------------------------------

Its modified to work with current xlog format of Greenplum, which is different
from Postgres code when this patch was committed. Main changes are to fit
current backup block format in xlog records. Also, some masking routines differ
compared to upstream as some of the masked flags would come once Greenplum
catches up to latest versions of Postgres.
上级 7114d535
......@@ -585,9 +585,10 @@ print_backup_blocks(XLogRecPtr cur, XLogRecord *rec)
getSpaceName(bkb.node.spcNode, spaceName, sizeof(spaceName));
getDbName(bkb.node.dbNode, dbName, sizeof(dbName));
getRelName(bkb.node.relNode, relName, sizeof(relName));
snprintf(buf, sizeof(buf), "bkpblock[%d]: s/d/r:%s/%s/%s blk:%u hole_off/len:%u/%u\n",
i+1, spaceName, dbName, relName,
bkb.block, bkb.hole_offset, bkb.hole_length);
snprintf(buf, sizeof(buf), "bkpblock[%d]: s/d/r:%s/%s/%s blk:%u hole_off/len:%u/%u apply:%d\n",
i+1, spaceName, dbName, relName,
bkb.block, bkb.hole_offset, bkb.hole_length,
(bkb.block_info & BLOCK_APPLY) != 0);
blk += sizeof(BkpBlock) + (BLCKSZ - bkb.hole_length);
if (!enable_stats)
......
......@@ -12,6 +12,6 @@ subdir = src/backend/access/common
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = memtuple.o heaptuple.o indextuple.o printtup.o reloptions.o scankey.o tupdesc.o
OBJS = bufmask.o memtuple.o heaptuple.o indextuple.o printtup.o reloptions.o scankey.o tupdesc.o
include $(top_srcdir)/src/backend/common.mk
/*-------------------------------------------------------------------------
*
* bufmask.c
* Routines for buffer masking. Used to mask certain bits
* in a page which can be different when the WAL is generated
* and when the WAL is applied.
*
* Portions Copyright (c) 2016, PostgreSQL Global Development Group
*
* Contains common routines required for masking a page.
*
* IDENTIFICATION
* src/backend/storage/buffer/bufmask.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/bufmask.h"
/*
* mask_page_lsn
*
* In consistency checks, the LSN of the two pages compared will likely be
* different because of concurrent operations when the WAL is generated
* and the state of the page when WAL is applied.
*/
void
mask_page_lsn_and_checksum(Page page)
{
PageHeader phdr = (PageHeader) page;
PageXLogRecPtrSet(phdr->pd_lsn, (uint64) MASK_MARKER);
phdr->pd_checksum = MASK_MARKER;
}
/*
* mask_page_hint_bits
*
* Mask hint bits in PageHeader. We want to ignore differences in hint bits,
* since they can be set without emitting any WAL.
*/
void
mask_page_hint_bits(Page page)
{
PageHeader phdr = (PageHeader) page;
/* Ignore prune_xid (it's like a hint-bit) */
phdr->pd_prune_xid = MASK_MARKER;
/* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints. */
PageClearFull(page);
PageClearHasFreeLinePointers(page);
#if PG_VERSION_NUM >= 80400
/*
* During replay, if the page LSN has advanced past our XLOG record's LSN,
* we don't mark the page all-visible. See heap_xlog_visible() for
* details.
*/
PageClearAllVisible(page);
#endif
}
/*
* mask_unused_space
*
* Mask the unused space of a page between pd_lower and pd_upper.
*/
void
mask_unused_space(Page page)
{
int pd_lower = ((PageHeader) page)->pd_lower;
int pd_upper = ((PageHeader) page)->pd_upper;
int pd_special = ((PageHeader) page)->pd_special;
/* Sanity check */
if (pd_lower > pd_upper || pd_special < pd_upper ||
pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ)
{
elog(ERROR, "invalid page pd_lower %u pd_upper %u pd_special %u\n",
pd_lower, pd_upper, pd_special);
}
memset(page + pd_lower, MASK_MARKER, pd_upper - pd_lower);
}
/*
* mask_lp_flags
*
* In some index AMs, line pointer flags can be modified in master without
* emitting any WAL record.
*/
void
mask_lp_flags(Page page)
{
OffsetNumber offnum,
maxoff;
maxoff = PageGetMaxOffsetNumber(page);
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
ItemId itemId = PageGetItemId(page, offnum);
if (ItemIdIsUsed(itemId))
itemId->lp_flags = LP_UNUSED;
}
}
/*
* mask_page_content
*
* In some index AMs, the contents of deleted pages need to be almost
* completely ignored.
*/
void
mask_page_content(Page page)
{
/* Mask Page Content */
memset(page + SizeOfPageHeaderData, MASK_MARKER,
BLCKSZ - SizeOfPageHeaderData);
/* Mask pd_lower and pd_upper */
memset(&((PageHeader) page)->pd_lower, MASK_MARKER,
sizeof(uint16));
memset(&((PageHeader) page)->pd_upper, MASK_MARKER,
sizeof(uint16));
}
......@@ -17,6 +17,8 @@
#include "access/gin.h"
#include "access/heapam.h"
#include "access/bufmask.h"
#include "utils/memutils.h"
#include "utils/guc.h"
......@@ -172,7 +174,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
}
/* nothing else to do if page was backed up */
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(data->node);
......@@ -404,7 +406,7 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
Page page;
/* nothing to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(data->node);
......@@ -472,7 +474,7 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
// -------- MirroredLock ----------
MIRROREDLOCK_BUFMGR_LOCK;
if (!(record->xl_info & XLR_BKP_BLOCK_1))
if (!(IsBkpBlockApplied(record, 0)))
{
buffer = XLogReadBuffer(reln, data->blkno, false);
if (BufferIsValid(buffer))
......@@ -489,7 +491,7 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
}
}
if (!(record->xl_info & XLR_BKP_BLOCK_2))
if (!(IsBkpBlockApplied(record, 1)))
{
buffer = XLogReadBuffer(reln, data->parentBlkno, false);
if (BufferIsValid(buffer))
......@@ -507,7 +509,7 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
}
}
if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
if (!(IsBkpBlockApplied(record, 2)) && data->leftBlkno != InvalidBlockNumber)
{
buffer = XLogReadBuffer(reln, data->leftBlkno, false);
if (BufferIsValid(buffer))
......@@ -721,3 +723,36 @@ gin_safe_restartpoint(void)
return false;
return true;
}
/*
* Mask a GIN page before running consistency checks on it.
*/
void
gin_mask(char *pagedata, BlockNumber blkno)
{
Page page = (Page) pagedata;
GinPageOpaque opaque;
mask_page_lsn_and_checksum(page);
opaque = GinPageGetOpaque(page);
mask_page_hint_bits(page);
/*
* GIN metapage doesn't use pd_lower/pd_upper. Other page types do. Hence,
* we need to apply masking for those pages.
*/
#if PG_VERSION_NUM >= 80400
if (opaque->flags != GIN_META)
#endif
{
/*
* For GIN_DELETED page, the page is initialized to empty. Hence, mask
* the page content.
*/
if (opaque->flags & GIN_DELETED)
mask_page_content(page);
else
mask_unused_space(page);
}
}
......@@ -13,6 +13,7 @@
*/
#include "postgres.h"
#include "access/bufmask.h"
#include "access/gist_private.h"
#include "access/heapam.h"
#include "miscadmin.h"
......@@ -204,7 +205,7 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
NULL);
/* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
decodePageUpdateRecord(&xlrec, record);
......@@ -288,7 +289,7 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
Page page;
/* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(xldata->node);
......@@ -630,6 +631,52 @@ gistMakePageLayout(Buffer *buffers, int nbuffers)
return res;
}
/*
* Mask a Gist page before running consistency checks on it.
*/
void
gist_mask(char *pagedata, BlockNumber blkno)
{
Page page = (Page) pagedata;
mask_page_lsn_and_checksum(page);
mask_page_hint_bits(page);
mask_unused_space(page);
/*
* NSN is nothing but a special purpose LSN. Hence, mask it for the same
* reason as mask_page_lsn.
*/
PageXLogRecPtrSet(GistPageGetOpaque(page)->nsn, (uint64) MASK_MARKER);
#if PG_VERSION_NUM >= 90100
/*
* We update F_FOLLOW_RIGHT flag on the left child after writing WAL
* record. Hence, mask this flag. See gistplacetopage() for details.
*/
GistMarkFollowRight(page);
#endif
if (GistPageIsLeaf(page))
{
/*
* In gist leaf pages, it is possible to modify the LP_FLAGS without
* emitting any WAL record. Hence, mask the line pointer flags. See
* gistkillitems() for details.
*/
mask_lp_flags(page);
}
#if PG_VERSION_NUM >= 90600
/*
* During gist redo, we never mark a page as garbage. Hence, mask it to
* ignore any differences.
*/
GistClearPageHasGarbage(page);
#endif
}
/*
* Continue insert after crash. In normal situations, there aren't any
* incomplete inserts, but if a crash occurs partway through an insertion
......
......@@ -39,6 +39,7 @@
*/
#include "postgres.h"
#include "access/bufmask.h"
#include "access/heapam.h"
#include "access/hio.h"
#include "access/multixact.h"
......@@ -5024,7 +5025,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
int ndead;
int nunused;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(xlrec->heapnode.node);
......@@ -5096,7 +5097,7 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(xlrec->heapnode.node);
......@@ -5211,7 +5212,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
ItemId lp = NULL;
HeapTupleHeader htup;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(xlrec->target.node);
......@@ -5309,7 +5310,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
xl_heap_header xlhdr;
uint32 newlen;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(xlrec->target.node);
......@@ -5427,7 +5428,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
int hsize;
uint32 newlen;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
{
if (samepage)
return; /* backup block covered both changes */
......@@ -5520,7 +5521,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
newt:;
if (record->xl_info & XLR_BKP_BLOCK_2)
if (IsBkpBlockApplied(record, 1))
{
MIRROREDLOCK_BUFMGR_UNLOCK;
// -------- MirroredLock ----------
......@@ -5635,7 +5636,7 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
ItemId lp = NULL;
HeapTupleHeader htup;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
reln = XLogOpenRelation(xlrec->target.node);
......@@ -5717,7 +5718,7 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
uint32 oldlen;
uint32 newlen;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
// -------- MirroredLock ----------
......@@ -6107,3 +6108,102 @@ RelationAllowedToGenerateXLogRecord(Relation relation)
return false;
}
/*
* Mask a heap page before performing consistency checks on it.
*/
void
heap_mask(char *pagedata, BlockNumber blkno)
{
Page page = (Page) pagedata;
OffsetNumber off;
mask_page_lsn_and_checksum(page);
mask_page_hint_bits(page);
mask_unused_space(page);
for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
{
ItemId iid = PageGetItemId(page, off);
char *page_item;
page_item = (char *) (page + ItemIdGetOffset(iid));
if (ItemIdIsNormal(iid))
{
HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
/*
* During normal operation, the ctid is used to follow the update
* chain, to find the latest tuple version, if a READ COMMITTED
* transaction tries to update the updated tuple. But after
* restart and WAL replay, there cannot be any live transactions
* that would see the old tuple version. That's why during WAL
* redo ctid is just set to itself. Hence for MOVED case set
* t_ctid to current block number and self offset number to ignore
* any inconsistency.
*/
if (page_htup->t_infomask & HEAP_MOVED)
{
ItemPointerSet(&page_htup->t_ctid, blkno, off);
}
/*
* If xmin of a tuple is not yet frozen, we should ignore
* differences in hint bits, since they can be set without
* emitting WAL.
*/
if (!(((page_htup)->t_infomask & (HEAP_XMIN_FROZEN)) == HEAP_XMIN_FROZEN))
page_htup->t_infomask &= ~HEAP_XACT_MASK;
else
{
/* Still we need to mask xmax hint bits. */
page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
}
/* mask out GPDB specific hint-bits */
page_htup->t_infomask2 &= ~HEAP_XMIN_DISTRIBUTED_SNAPSHOT_IGNORE;
page_htup->t_infomask2 &= ~HEAP_XMAX_DISTRIBUTED_SNAPSHOT_IGNORE;
/*
* During replay, we set Command Id to FirstCommandId. Hence, mask
* it. See heap_xlog_insert() for details.
*/
page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
#if PG_VERSION_NUM >= 90500
/*
* For a speculative tuple, heap_insert() does not set ctid in the
* caller-passed heap tuple itself, leaving the ctid field to
* contain a speculative token value - a per-backend monotonically
* increasing identifier. Besides, it does not WAL-log ctid under
* any circumstances.
*
* During redo, heap_xlog_insert() sets t_ctid to current block
* number and self offset number. It doesn't care about any
* speculative insertions in master. Hence, we set t_ctid to
* current block number and self offset number to ignore any
* inconsistency.
*/
if (HeapTupleHeaderIsSpeculative(page_htup))
ItemPointerSet(&page_htup->t_ctid, blkno, off);
#endif
}
/*
* Ignore any padding bytes after the tuple, when the length of the
* item is not MAXALIGNed.
*/
if (ItemIdHasStorage(iid))
{
int len = ItemIdGetLength(iid);
int padlen = MAXALIGN(len) - len;
if (padlen > 0)
memset(page_item + len, MASK_MARKER, padlen);
}
}
}
......@@ -13,10 +13,11 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/nbtree.h"
#include "access/transam.h"
#include "utils/guc.h"
#include "miscadmin.h"
#include "access/bufmask.h"
/*
* We must keep track of expected insertions due to page splits, and apply
......@@ -220,7 +221,7 @@ btree_xlog_insert(bool isleaf, bool ismeta,
datalen -= sizeof(xl_btree_metadata);
}
if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
if ((IsBkpBlockApplied(record, 0)) && !ismeta && isleaf)
return; /* nothing to do */
reln = XLogOpenRelation(xlrec->target.node);
......@@ -228,7 +229,7 @@ btree_xlog_insert(bool isleaf, bool ismeta,
// -------- MirroredLock ----------
MIRROREDLOCK_BUFMGR_LOCK;
if (!(record->xl_info & XLR_BKP_BLOCK_1))
if (!(IsBkpBlockApplied(record, 0)))
{
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
......@@ -309,7 +310,7 @@ btree_xlog_split(bool onleft, bool isroot,
forget_matching_split(xlrec->node, downlink, false);
/* Extract left hikey and its size (still assuming 16-bit alignment) */
if (!(record->xl_info & XLR_BKP_BLOCK_1))
if (!(IsBkpBlockApplied(record, 0)))
{
/* We assume 16-bit alignment is enough for IndexTupleSize */
left_hikey = (Item) datapos;
......@@ -329,7 +330,7 @@ btree_xlog_split(bool onleft, bool isroot,
datalen -= sizeof(OffsetNumber);
}
if (onleft && !(record->xl_info & XLR_BKP_BLOCK_1))
if (onleft && !(IsBkpBlockApplied(record, 0)))
{
/*
* We assume that 16-bit alignment is enough to apply IndexTupleSize
......@@ -381,7 +382,7 @@ btree_xlog_split(bool onleft, bool isroot,
* item number order, but it does not reproduce the physical order they
* would have had. Is this worth changing? See also _bt_restore_page().
*/
if (!(record->xl_info & XLR_BKP_BLOCK_1))
if (!(IsBkpBlockApplied(record, 0)))
{
Buffer lbuf = XLogReadBuffer(reln, xlrec->leftsib, false);
......@@ -450,7 +451,7 @@ btree_xlog_split(bool onleft, bool isroot,
UnlockReleaseBuffer(rbuf);
/* Fix left-link of the page to the right of the new right sibling */
if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
if (xlrec->rnext != P_NONE && !(IsBkpBlockApplied(record, 1)))
{
Buffer buffer = XLogReadBuffer(reln, xlrec->rnext, false);
......@@ -490,7 +491,7 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
Page page;
BTPageOpaque opaque;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
xlrec = (xl_btree_delete *) XLogRecGetData(record);
......@@ -570,7 +571,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
MIRROREDLOCK_BUFMGR_LOCK;
/* parent page */
if (!(record->xl_info & XLR_BKP_BLOCK_1))
if (!(IsBkpBlockApplied(record, 0)))
{
buffer = XLogReadBuffer(reln, parent, false);
REDO_PRINT_READ_BUFFER_NOT_FOUND(reln, parent, buffer, lsn);
......@@ -617,7 +618,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
}
/* Fix left-link of right sibling */
if (!(record->xl_info & XLR_BKP_BLOCK_2))
if (!(IsBkpBlockApplied(record, 1)))
{
buffer = XLogReadBuffer(reln, rightsib, false);
REDO_PRINT_READ_BUFFER_NOT_FOUND(reln, rightsib, buffer, lsn);
......@@ -642,7 +643,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
}
/* Fix right-link of left sibling, if any */
if (!(record->xl_info & XLR_BKP_BLOCK_3))
if (!(IsBkpBlockApplied(record, 2)))
{
if (leftsib != P_NONE)
{
......@@ -853,14 +854,14 @@ out_insert(StringInfo buf, bool isleaf, bool ismeta, XLogRecord *record)
datalen -= sizeof(xl_btree_metadata);
}
if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
if ((IsBkpBlockApplied(record, 0)) && !ismeta && isleaf)
{
appendStringInfo(buf, "; page %u",
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
return; /* nothing to do */
}
if (!(record->xl_info & XLR_BKP_BLOCK_1))
if (!(IsBkpBlockApplied(record, 0)))
{
appendStringInfo(buf, "; add length %d item at offset %d in page %u",
datalen,
......@@ -890,7 +891,7 @@ out_delete(StringInfo buf, XLogRecord *record)
char *rec = XLogRecGetData(record);
xl_btree_delete *xlrec = (xl_btree_delete *) rec;
if (record->xl_info & XLR_BKP_BLOCK_1)
if (IsBkpBlockApplied(record, 0))
return;
xlrec = (xl_btree_delete *) XLogRecGetData(record);
......@@ -1143,3 +1144,52 @@ btree_safe_restartpoint(void)
return false;
return true;
}
/*
* Mask a btree page before performing consistency checks on it.
*/
void
btree_mask(char *pagedata, BlockNumber blkno)
{
Page page = (Page) pagedata;
BTPageOpaque maskopaq;
mask_page_lsn_and_checksum(page);
mask_page_hint_bits(page);
mask_unused_space(page);
maskopaq = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISDELETED(maskopaq))
{
/*
* Mask page content on a DELETED page since it will be re-initialized
* during replay. See btree_xlog_unlink_page() for details.
*/
mask_page_content(page);
}
else if (P_ISLEAF(maskopaq))
{
/*
* In btree leaf pages, it is possible to modify the LP_FLAGS without
* emitting any WAL record. Hence, mask the line pointer flags. See
* _bt_killitems(), _bt_check_unique() for details.
*/
mask_lp_flags(page);
}
/*
* BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
* _bt_killitems(), _bt_check_unique() for details.
*/
maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
/*
* During replay of a btree page split, we don't set the BTP_SPLIT_END
* flag of the right sibling and initialize the cycle_id to 0 for the same
* page. See btree_xlog_split() for details.
*/
maskopaq->btpo_flags &= ~BTP_SPLIT_END;
maskopaq->btpo_cycleid = 0;
}
......@@ -26,28 +26,28 @@
#include "cdb/cdbappendonlyam.h"
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL},
{"Transaction", xact_redo, xact_desc, NULL, NULL, NULL},
{"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL},
{"CLOG", clog_redo, clog_desc, NULL, NULL, NULL},
{"Database", dbase_redo, dbase_desc, NULL, NULL, NULL},
{"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},
{"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},
{"Reserved 7", NULL, NULL, NULL, NULL, NULL},
{"Reserved 8", NULL, NULL, NULL, NULL, NULL},
{"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},
{"Heap", heap_redo, heap_desc, NULL, NULL, NULL},
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
{"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint},
{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL},
{"Bitmap", bitmap_redo, bitmap_desc, bitmap_xlog_startup, bitmap_xlog_cleanup, bitmap_safe_restartpoint},
{"DistributedLog", DistributedLog_redo, DistributedLog_desc, NULL, NULL, NULL},
{"Master Mirror Log Records", mmxlog_redo, mmxlog_desc, NULL, NULL, NULL},
{"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL, NULL},
{"Transaction", xact_redo, xact_desc, NULL, NULL, NULL, NULL},
{"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL, NULL},
{"CLOG", clog_redo, clog_desc, NULL, NULL, NULL, NULL},
{"Database", dbase_redo, dbase_desc, NULL, NULL, NULL, NULL},
{"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL, NULL},
{"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL, NULL},
{"Reserved 7", NULL, NULL, NULL, NULL, NULL, NULL},
{"Reserved 8", NULL, NULL, NULL, NULL, NULL, NULL},
{"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL, heap_mask},
{"Heap", heap_redo, heap_desc, NULL, NULL, NULL, heap_mask},
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint, btree_mask},
{"Hash", hash_redo, hash_desc, NULL, NULL, NULL, NULL},
{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint, gin_mask},
{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint, gist_mask},
{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL, seq_mask},
{"Bitmap", bitmap_redo, bitmap_desc, bitmap_xlog_startup, bitmap_xlog_cleanup, bitmap_safe_restartpoint, NULL},
{"DistributedLog", DistributedLog_redo, DistributedLog_desc, NULL, NULL, NULL, NULL},
{"Master Mirror Log Records", mmxlog_redo, mmxlog_desc, NULL, NULL, NULL, NULL},
#ifdef USE_SEGWALREP
{"Appendonly Table Log Records", appendonly_redo, appendonly_desc, NULL, NULL, NULL}
{"Appendonly Table Log Records", appendonly_redo, appendonly_desc, NULL, NULL, NULL, NULL}
#endif /* USE_SEGWALREP */
};
......@@ -106,6 +106,8 @@ char *XLogArchiveCommand = NULL;
char *XLOG_sync_method = NULL;
const char XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
bool fullPageWrites = true;
char *wal_consistency_checking_string = NULL;
bool *wal_consistency_checking = NULL;
bool log_checkpoints = false;
#ifdef WAL_DEBUG
......@@ -177,6 +179,9 @@ static TransactionId recoveryTargetXid;
static TimestampTz recoveryTargetTime;
static TimestampTz recoveryLastXTime = 0;
static char *replay_image_masked = NULL;
static char *master_image_masked = NULL;
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyModeRequested = false;
static char *PrimaryConnInfo = NULL;
......@@ -625,9 +630,11 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void Checkpoint_RecoveryPass(XLogRecPtr checkPointRedo);
static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
bool wal_check_consistency_enabled,
XLogRecPtr *lsn, BkpBlock *bkpb);
static void RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
char *blk, bool get_cleanup_lock, bool keep_buffer);
static bool AdvanceXLInsertBuffer(bool new_segment);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
static void XLogFileInit(
......@@ -717,6 +724,7 @@ void HandleStartupProcInterrupts(void);
static bool CheckForStandbyTrigger(void);
static void GetXLogCleanUpTo(XLogRecPtr recptr, uint32 *_logId, uint32 *_logSeg);
static void checkXLogConsistency(XLogRecord *record, XLogRecPtr EndRecPtr);
/*
* Whether we need to always generate transaction log (XLOG), or if we can
......@@ -846,6 +854,7 @@ XLogInsert_Internal(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId h
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
bool rdata_iscopy = false;
uint8 extended_info = 0;
/* Safety check in case our assumption is ever broken. */
/* NOTE: This is slightly modified from the one in xact.c -- the test for */
......@@ -897,6 +906,15 @@ XLogInsert_Internal(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId h
return RecPtr;
}
/*
* Enforce consistency checks for this record if user is looking for
* it. Do this before at the beginning of this routine to give the
* possibility for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY
* directly for a record.
*/
if (wal_consistency_checking[rmid])
extended_info |= XLR_CHECK_CONSISTENCY;
/*
* Here we scan the rdata chain, determine which buffers must be backed
* up, and compute the CRC values for the data. Note that the record
......@@ -948,8 +966,13 @@ begin:;
{
if (rdt->buffer == dtbuf[i])
{
/* Buffer already referenced by earlier chain item */
if (dtbuf_bkp[i])
/*
* Buffer already referenced by earlier chain item and
* will be applied then only ignore it. Block can exist
* for consistency check purpose and hence should include
* original data along if its only for that purpose.
*/
if (dtbuf_bkp[i] && (dtbuf_xlg[i].block_info & BLOCK_APPLY))
rdt->data = NULL;
else if (rdt->data)
{
......@@ -962,11 +985,23 @@ begin:;
{
/* OK, put it in this slot */
dtbuf[i] = rdt->buffer;
if (doPageWrites && XLogCheckBuffer(rdt, true,
(extended_info & XLR_CHECK_CONSISTENCY) != 0,
&(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
{
dtbuf_bkp[i] = true;
rdt->data = NULL;
if (dtbuf_xlg[i].block_info & BLOCK_APPLY)
rdt->data = NULL;
else
{
if (rdt->data)
{
len += rdt->len;
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
}
}
}
else if (rdt->data)
{
......@@ -1214,6 +1249,7 @@ begin:;
record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info;
record->xl_rmid = rmid;
record->xl_extended_info = extended_info;
/* Now we can finish computing the record's CRC */
COMP_CRC32C(rdata_crc, (char *) record + sizeof(pg_crc32),
......@@ -1525,9 +1561,11 @@ XLogLastInsertDataLen(void)
*/
static bool
XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
bool wal_check_consistency_enabled,
XLogRecPtr *lsn, BkpBlock *bkpb)
{
PageHeader page;
bool needs_backup;
page = (PageHeader) BufferGetBlock(rdata->buffer);
......@@ -1542,13 +1580,26 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
else
*lsn = BufferGetLSNAtomic(rdata->buffer);
if (XLByteLE(*lsn, RedoRecPtr))
needs_backup = XLByteLE(page->pd_lsn, RedoRecPtr);
if (needs_backup || wal_check_consistency_enabled)
{
/*
* The page needs to be backed up, so set up *bkpb
*/
bkpb->node = BufferGetFileNode(rdata->buffer);
bkpb->block = BufferGetBlockNumber(rdata->buffer);
bkpb->block_info = 0;
/*
* If WAL consistency checking is enabled for the
* resource manager of this WAL record, a full-page
* image is included in the record for the block
* modified. During redo, the full-page is replayed
* only if block_apply is set.
*/
if (needs_backup)
bkpb->block_info |= BLOCK_APPLY;
if (rdata->buffer_std)
{
......@@ -3386,9 +3437,9 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
RestoreBackupBlockContents(lsn, bkpb, blk, false, /* get_cleanup_lock is ignored in GPDB */
false);
/* get_cleanup_lock is ignored in GPDB */
RestoreBackupBlockContents(lsn, bkpb, blk, false, false);
blk += BLCKSZ - bkpb.hole_length;
}
}
......@@ -3398,7 +3449,7 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
*
* Restores a full-page image from BkpBlock and a data pointer.
*/
static Buffer
static void
RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
bool get_cleanup_lock, bool keep_buffer)
{
......@@ -3406,6 +3457,9 @@ RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
Page page;
Relation reln;
if (! (bkpb.block_info & BLOCK_APPLY))
return;
MIRROREDLOCK_BUFMGR_DECLARE;
reln = XLogOpenRelation(bkpb.node);
......@@ -3451,7 +3505,34 @@ RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
MIRROREDLOCK_BUFMGR_UNLOCK;
// -------- MirroredLock ----------
return buffer;
return;
}
bool
IsBkpBlockApplied(XLogRecord *record, uint8 block_id)
{
BkpBlock bkpb;
char *blk;
int i;
Assert(block_id < XLR_MAX_BKP_BLOCKS);
blk = (char *) XLogRecGetData(record) + record->xl_len;
for (i = 0; i <= block_id; i++)
{
if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
continue;
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
if (i == block_id)
return (bkpb.block_info & BLOCK_APPLY) != 0;
blk += BLCKSZ - bkpb.hole_length;
}
return false;
}
/*
......@@ -6370,6 +6451,14 @@ ApplyStartupRedo(
RmgrTable[record->xl_rmid].rm_redo(*beginLoc, *lsn, record);
/*
* After redo, check whether the backup pages associated with
* the WAL record are consistent with the existing pages. This
* check is done only if consistency check is enabled for this
* record.
*/
if ((record->xl_extended_info & XLR_CHECK_CONSISTENCY) != 0)
checkXLogConsistency(record, *lsn);
/* Pop the error context stack */
error_context_stack = errcontext.previous;
......@@ -6588,6 +6677,13 @@ StartupXLOG(void)
if (StandbyModeRequested)
OwnLatch(&XLogCtl->recoveryWakeupLatch);
/*
* Allocate pages dedicated to WAL consistency checks, those had better
* be aligned.
*/
replay_image_masked = (char *) palloc(BLCKSZ);
master_image_masked = (char *) palloc(BLCKSZ);
if (read_backup_label(&checkPointLoc, &backupEndRequired))
{
/*
......@@ -7614,6 +7710,13 @@ StartupXLOG_Pass3(void)
SetupCheckpointPreparedTransactionList(ckptExtended.ptas);
}
/*
* Allocate pages dedicated to WAL consistency checks, those had better
* be aligned.
*/
replay_image_masked = (char *) palloc(BLCKSZ);
master_image_masked = (char *) palloc(BLCKSZ);
record = XLogReadRecord(&XLogCtl->pass1StartLoc, false, PANIC);
/*
......@@ -9593,7 +9696,7 @@ XLogSaveBufferForHint(Buffer buffer, Relation relation)
/*
* Check buffer while not holding an exclusive lock.
*/
if (XLogCheckBuffer(rdata, false, &lsn, &bkpbwithpt.bkpb))
if (XLogCheckBuffer(rdata, false, false, &lsn, &bkpbwithpt.bkpb))
{
char copied_buffer[BLCKSZ];
char *origdata = (char *) BufferGetBlock(buffer);
......@@ -12191,3 +12294,140 @@ GetXLogCleanUpTo(XLogRecPtr recptr, uint32 *_logId, uint32 *_logSeg)
}
#endif
}
/*
* Checks whether the current buffer page and backup page stored in the
* WAL record are consistent or not. Before comparing the two pages, a
* masking can be applied to the pages to ignore certain areas like hint bits,
* unused space between pd_lower and pd_upper among other things. This
* function should be called once WAL replay has been completed for a
* given record.
*/
static void
checkXLogConsistency(XLogRecord *record, XLogRecPtr EndRecPtr)
{
MIRROREDLOCK_BUFMGR_DECLARE;
RmgrId rmid = record->xl_rmid;
char *blk;
/* Records with no backup blocks have no need for consistency checks. */
if (!(record->xl_info & XLR_BKP_BLOCK_MASK))
return;
Assert((record->xl_extended_info & XLR_CHECK_CONSISTENCY) != 0);
blk = (char *) XLogRecGetData(record) + record->xl_len;
for (int i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
Relation reln;
BkpBlock bkpb;
Buffer buf;
Page page;
char *src_buffer;
if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
{
/*
* WAL record doesn't contain a block do nothing.
*/
continue;
}
memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
src_buffer = blk;
/* move on to point to next block */
blk += BLCKSZ - bkpb.hole_length;
if (bkpb.block_info & BLOCK_APPLY)
{
/*
* WAL record has already applied the page, so bypass the
* consistency check as that would result in comparing the full
* page stored in the record with itself.
*/
continue;
}
reln = XLogOpenRelation(bkpb.node);
// -------- MirroredLock ----------
MIRROREDLOCK_BUFMGR_LOCK;
/*
* Read the contents from the current buffer and store it in a
* temporary page.
*/
buf = XLogReadBuffer(reln, bkpb.block, false);
if (!BufferIsValid(buf))
continue;
page = BufferGetPage(buf);
/*
* Take a copy of the local page where WAL has been applied to have a
* comparison base before masking it...
*/
memcpy(replay_image_masked, page, BLCKSZ);
/* No need for this page anymore now that a copy is in. */
UnlockReleaseBuffer(buf);
MIRROREDLOCK_BUFMGR_UNLOCK;
// -------- MirroredLock ----------
/*
* If the block LSN is already ahead of this WAL record, we can't
* expect contents to match. This can happen if recovery is
* restarted.
*/
if (XLByteLT(EndRecPtr, PageGetLSN(replay_image_masked)))
continue;
/*
* Read the contents from the backup copy, stored in WAL record and
* store it in a temporary page. There is no need to allocate a new
* page here, a local buffer is fine to hold its contents and a mask
* can be directly applied on it.
*/
if (bkpb.hole_length == 0)
{
memcpy((char *) master_image_masked, src_buffer, BLCKSZ);
}
else
{
/* zero-fill the hole, anyways gets masked out */
MemSet((char *) master_image_masked, 0, BLCKSZ);
memcpy((char *) master_image_masked, src_buffer, bkpb.hole_offset);
memcpy((char *) master_image_masked + (bkpb.hole_offset + bkpb.hole_length),
src_buffer + bkpb.hole_offset,
BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
}
/*
* If masking function is defined, mask both the master and replay
* images
*/
if (RmgrTable[rmid].rm_mask != NULL)
{
RmgrTable[rmid].rm_mask(replay_image_masked, bkpb.block);
RmgrTable[rmid].rm_mask(master_image_masked, bkpb.block);
}
/* Time to compare the master and replay images. */
if (memcmp(replay_image_masked, master_image_masked, BLCKSZ) != 0)
{
elog(FATAL,
"inconsistent page found, rel %u/%u/%u, blkno %u",
bkpb.node.spcNode, bkpb.node.dbNode, bkpb.node.relNode,
bkpb.block);
}
else
{
elog(DEBUG1,
"Consistent page for rel %u/%u/%u, blkno %u",
bkpb.node.spcNode, bkpb.node.dbNode, bkpb.node.relNode,
bkpb.block);
}
}
}
......@@ -17,6 +17,7 @@
#include "postgres.h"
#include "access/heapam.h"
#include "access/bufmask.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/dependency.h"
......@@ -2007,3 +2008,14 @@ cdb_sequence_nextval_server(Oid tablespaceid,
/* Cleanup. */
cdb_sequence_relation_term(seqrel);
} /* cdb_sequence_server_nextval */
/*
* Mask a Sequence page before performing consistency checks on it.
*/
void
seq_mask(char *page, BlockNumber blkno)
{
mask_page_lsn_and_checksum(page);
mask_unused_space(page);
}
......@@ -28,9 +28,11 @@
#endif
#include "access/gin.h"
#include "access/rmgr.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/namespace.h"
#include "commands/async.h"
#include "commands/prepare.h"
......@@ -138,6 +140,9 @@ extern char *SSLCipherSuites;
static const char *assign_log_destination(const char *value,
bool doit, GucSource source);
static const char *assign_wal_consistency_checking(const char *newval,
bool doit, GucSource source);
#ifdef HAVE_SYSLOG
static int syslog_facility = LOG_LOCAL0;
......@@ -2690,6 +2695,7 @@ static struct config_string ConfigureNamesString[] =
&external_pid_file,
NULL, assign_canonical_path, NULL
},
/* placed here as a temporary hack until we get guc enums */
{
{"bytea_output", PGC_USERSET, CLIENT_CONN_STATEMENT,
......@@ -2699,6 +2705,18 @@ static struct config_string ConfigureNamesString[] =
&bytea_output_temp,
"escape", assign_bytea, NULL, NULL
},
{
{"wal_consistency_checking", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Sets the WAL resource managers for which WAL consistency checks are done."),
gettext_noop("Full-page images will be logged for all data blocks and cross-checked against the results of WAL replay."),
GUC_LIST_INPUT | GUC_NOT_IN_SAMPLE
},
&wal_consistency_checking_string,
"",
assign_wal_consistency_checking, NULL
},
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL
......@@ -7400,6 +7418,96 @@ assign_msglvl(int *var, const char *newval, bool doit, GucSource source)
return newval; /* OK */
}
/*
* assign_hook and show_hook subroutines
*/
static const char *
assign_wal_consistency_checking(const char *newval, bool doit, GucSource source)
{
char *rawstring;
List *elemlist;
ListCell *l;
bool newwalconsistency[RM_MAX_ID + 1];
/* Initialize the array */
MemSet(newwalconsistency, 0, (RM_MAX_ID + 1) * sizeof(bool));
/* Need a modifiable copy of string */
rawstring = guc_strdup(ERROR, newval);
/* Parse string into list of identifiers */
if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
free(rawstring);
list_free(elemlist);
/* syntax error in list */
ereport(GUC_complaint_elevel(source),
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("List syntax is invalid.")));
return NULL;
}
free(rawstring);
foreach(l, elemlist)
{
char *tok = (char *) lfirst(l);
bool found = false;
RmgrId rmid;
/* Check for 'all'. */
if (pg_strcasecmp(tok, "all") == 0)
{
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
if (RmgrTable[rmid].rm_mask != NULL)
newwalconsistency[rmid] = true;
found = true;
}
else
{
/*
* Check if the token matches with any individual resource
* manager.
*/
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
{
if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
RmgrTable[rmid].rm_mask != NULL)
{
newwalconsistency[rmid] = true;
found = true;
}
}
}
/* If a valid resource manager is found, check for the next one. */
if (!found)
{
list_free(elemlist);
ereport(GUC_complaint_elevel(source),
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Unrecognized key word: \"%s\".", tok)));
return NULL;
}
}
list_free(elemlist);
if (doit)
{
/* assign new value */
wal_consistency_checking = guc_malloc(ERROR, (RM_MAX_ID + 1) * sizeof(bool));
memcpy(wal_consistency_checking,
newwalconsistency,
(RM_MAX_ID + 1) * sizeof(bool));
}
return newval;
}
static const char *
assign_IntervalStyle(const char *newval, bool doit, GucSource source)
{
......
/*-------------------------------------------------------------------------
*
* bufmask.h
* Definitions for buffer masking routines, used to mask certain bits
* in a page which can be different when the WAL is generated
* and when the WAL is applied. This is really the job of each
* individual rmgr, but we make things easier by providing some
* common routines to handle cases which occur in multiple rmgrs.
*
* Portions Copyright (c) 2016, PostgreSQL Global Development Group
*
* src/include/access/bufmask.h
*
*-------------------------------------------------------------------------
*/
#ifndef BUFMASK_H
#define BUFMASK_H
#include "postgres.h"
#include "storage/block.h"
#include "storage/bufmgr.h"
/* Marker used to mask pages consistently */
#define MASK_MARKER 0
extern void mask_page_lsn_and_checksum(Page page);
extern void mask_page_hint_bits(Page page);
extern void mask_unused_space(Page page);
extern void mask_lp_flags(Page page);
extern void mask_page_content(Page page);
#endif
......@@ -471,4 +471,6 @@ extern void ginInsertRecordBA(BuildAccumulator *accum,
ItemPointer heapptr, Datum *entries, int32 nentry);
extern ItemPointerData *ginGetEntry(BuildAccumulator *accum, Datum *entry, uint32 *n);
extern void gin_mask(char *pagedata, BlockNumber blkno);
#endif
......@@ -284,6 +284,7 @@ extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
extern bool gist_safe_restartpoint(void);
extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
extern void gist_mask(char *pagedata, BlockNumber blkno);
extern XLogRecData *formUpdateRdata(Relation r, Buffer buffer,
OffsetNumber *todelete, int ntodelete,
......
......@@ -295,6 +295,7 @@ extern bool heap_getrelfilenode(
RelFileNode *relFileNode);
extern void heap2_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *rptr);
extern void heap2_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record);
extern void heap_mask(char *pagedata, BlockNumber blkno);
extern void log_heap_newpage(Relation rel,
Page page,
......
......@@ -175,6 +175,7 @@ typedef HeapTupleHeaderData *HeapTupleHeader;
#define HEAP_IS_LOCKED (HEAP_XMAX_EXCL_LOCK | HEAP_XMAX_SHARED_LOCK)
#define HEAP_XMIN_COMMITTED 0x0100 /* t_xmin committed */
#define HEAP_XMIN_INVALID 0x0200 /* t_xmin invalid/aborted */
#define HEAP_XMIN_FROZEN (HEAP_XMIN_COMMITTED|HEAP_XMIN_INVALID)
#define HEAP_XMAX_COMMITTED 0x0400 /* t_xmax committed */
#define HEAP_XMAX_INVALID 0x0800 /* t_xmax invalid/aborted */
#define HEAP_XMAX_IS_MULTI 0x1000 /* t_xmax is a MultiXactId */
......
......@@ -635,5 +635,6 @@ extern void btree_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record);
extern void btree_xlog_startup(void);
extern void btree_xlog_cleanup(void);
extern bool btree_safe_restartpoint(void);
extern void btree_mask(char *pagedata, BlockNumber blkno);
#endif /* NBTREE_H */
......@@ -70,6 +70,7 @@ typedef struct XLogRecord
uint32 xl_len; /* total len of rmgr data */
uint8 xl_info; /* flag bits, see below */
RmgrId xl_rmid; /* resource manager for this record */
uint8 xl_extended_info; /* flag bits, see below */
/* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
......@@ -82,10 +83,20 @@ typedef struct XLogRecord
#define XLogRecGetData(record) ((char*) (record) + SizeOfXLogRecord)
/*
* XLOG uses only low 4 bits of xl_info. High 4 bits may be used by rmgr.
* XLOG uses only low 4 bits of xl_info. High 4 bits may be used by rmgr.
* XLR_CHECK_CONSISTENCY bits can be passed by XLogInsert caller.
*/
#define XLR_INFO_MASK 0x0F
/*
* Enforces consistency checks of replayed WAL at recovery. If enabled,
* each record will log a full-page write for each block modified by the
* record and will reuse it afterwards for consistency checks. The caller
* of XLogInsert can use this value if necessary, but if
* wal_consistency_checking is enabled for a rmgr this is set unconditionally.
*/
#define XLR_CHECK_CONSISTENCY 0x02
/*
* If we backed up any disk blocks with the XLOG record, we use flag bits in
* xl_info to signal it. We support backup of up to 3 disk blocks per XLOG
......@@ -167,6 +178,10 @@ extern char *XLOG_sync_method;
extern const char XLOG_sync_method_default[];
extern bool gp_keep_all_xlog;
extern int keep_wal_segments;
extern bool *wal_consistency_checking;
extern char *wal_consistency_checking_string;
extern bool log_checkpoints;
#define XLogArchivingActive() (XLogArchiveMode)
......@@ -360,4 +375,7 @@ extern void do_pg_abort_backup(void);
#define BACKUP_LABEL_FILE "backup_label"
#define BACKUP_LABEL_OLD "backup_label.old"
extern bool
IsBkpBlockApplied(XLogRecord *record, uint8 block_id);
#endif /* XLOG_H */
......@@ -45,10 +45,13 @@ typedef struct BkpBlock
BlockNumber block; /* block number */
uint16 hole_offset; /* number of bytes before "hole" */
uint16 hole_length; /* number of bytes in "hole" */
uint8 block_info; /* flags, controls to apply the block or not for now */
/* ACTUAL BLOCK DATA FOLLOWS AT END OF STRUCT */
} BkpBlock;
/* Information stored in block_info */
#define BLOCK_APPLY 0x01 /* page image should be restored during replay */
typedef struct BkpBlockWithPT
{
ItemPointerData persistentTid;
......@@ -79,7 +82,7 @@ typedef struct XLogContRecord
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD062 /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD063 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
......@@ -262,6 +265,10 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
* Method table for resource managers.
*
* RmgrTable[] is indexed by RmgrId values (see rmgr.h).
*
* rm_mask takes as input a page modified by the resource manager and masks
* out bits that shouldn't be flagged by wal_consistency_checking.
*
*/
typedef struct RmgrData
{
......@@ -271,6 +278,7 @@ typedef struct RmgrData
void (*rm_startup) (void);
void (*rm_cleanup) (void);
bool (*rm_safe_restartpoint) (void);
void (*rm_mask) (char *pagedata, BlockNumber blkno);
} RmgrData;
extern const RmgrData RmgrTable[];
......
......@@ -122,5 +122,6 @@ cdb_sequence_nextval_server(Oid tablespaceid,
int64 *pincrement,
bool *poverflow);
extern void seq_mask(char *pagedata, BlockNumber blkno);
#endif /* SEQUENCE_H */
......@@ -388,6 +388,8 @@ PageGetLSN(Page page)
*/
#define PageSetLSN(page, lsn) \
(((PageHeader) (page))->pd_lsn = (lsn))
#define PageXLogRecPtrSet(ptr, lsn) \
((ptr).xlogid = (uint32) ((lsn) >> 32), (ptr).xrecoff = (uint32) (lsn))
#define PageHasFreeLinePointers(page) \
(((PageHeader) (page))->pd_flags & PD_HAS_FREE_LINES)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册