提交 ed0efd2a 编写于 作者: S Simon Riggs 提交者: Xin Zhang

Allow I/O reliability checks using 16-bit checksums

Checksums are set immediately prior to flush out of shared buffers
and checked when pages are read in again. Hint bit setting will
require full page write when block is dirtied, which causes various
infrastructure changes. Extensive comments, docs and README.

WARNING message thrown if checksum fails on non-all zeroes page;
ERROR thrown but can be disabled with ignore_checksum_failure = on.

Feature enabled by an initdb option, since transition from option off
to option on is long and complex and has not yet been implemented.
Default is not to use checksums.

Checksum used is WAL CRC-32 truncated to 16-bits.

Simon Riggs, Jeff Davis, Greg Smith
Wide input and assistance from many community members. Thank you.

(cherry picked from commit 96ef3b8f)
上级 626df6b4
......@@ -52,6 +52,7 @@ get_control_data(migratorContext *ctx, ClusterInfo *cluster, bool live_check)
bool got_toast = false;
bool got_date_is_int = false;
bool got_float8_pass_by_value = false;
bool got_data_checksums = false;
char *lc_collate = NULL;
char *lc_ctype = NULL;
char *lc_monetary = NULL;
......@@ -123,6 +124,13 @@ get_control_data(migratorContext *ctx, ClusterInfo *cluster, bool live_check)
got_float8_pass_by_value = true;
}
/* Only in <= 9.2 */
if (GET_MAJOR_VERSION(cluster->major_version) <= 902)
{
cluster->controldata.data_checksums = false;
got_data_checksums = true;
}
/* we have the result of cmd in "output". so parse it line by line now */
while (fgets(bufin, sizeof(bufin), output))
{
......@@ -360,6 +368,18 @@ get_control_data(migratorContext *ctx, ClusterInfo *cluster, bool live_check)
cluster->controldata.float8_pass_by_value = strstr(p, "by value") != NULL;
got_float8_pass_by_value = true;
}
else if ((p = strstr(bufin, "checksums")) != NULL)
{
p = strchr(p, ':');
if (p == NULL || strlen(p) <= 1)
pg_log(ctx, PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
p++; /* removing ':' char */
/* used later for contrib check */
cluster->controldata.data_checksums = strstr(p, "enabled") != NULL;
got_data_checksums = true;
}
/* In pre-8.4 only */
else if ((p = strstr(bufin, "LC_COLLATE:")) != NULL)
{
......@@ -425,7 +445,7 @@ get_control_data(migratorContext *ctx, ClusterInfo *cluster, bool live_check)
!got_tli ||
!got_align || !got_blocksz || !got_largesz || !got_walsz ||
!got_walseg || !got_ident || !got_index || /* !got_toast || */
!got_date_is_int || !got_float8_pass_by_value)
!got_date_is_int || !got_float8_pass_by_value || !got_data_checksums)
{
pg_log(ctx, PG_REPORT,
"Some required control information is missing; cannot find:\n");
......@@ -478,8 +498,12 @@ get_control_data(migratorContext *ctx, ClusterInfo *cluster, bool live_check)
if (!got_float8_pass_by_value)
pg_log(ctx, PG_REPORT, " float8 argument passing method\n");
/* value added in Postgres 9.3 */
if (!got_data_checksums)
pg_log(ctx, PG_REPORT, " data checksums\n");
pg_log(ctx, PG_FATAL,
"Unable to continue without required control information, terminating\n");
"Cannot continue without required control information, terminating\n");
}
}
......@@ -547,6 +571,12 @@ check_control_data(migratorContext *ctx, ControlData *oldctrl,
"--disable-integer-datetimes or get server binaries built\n"
"with those options.\n");
}
if (oldctrl->data_checksums != newctrl->data_checksums)
{
pg_log(ctx, PG_FATAL,
"old and new pg_controldata checksums settings are invalid or do not match\n");
}
}
......
......@@ -240,6 +240,7 @@ typedef struct
uint32 toast;
bool date_is_int;
bool float8_pass_by_value;
bool data_checksums;
char *lc_collate;
char *lc_ctype;
char *encoding;
......
......@@ -148,6 +148,20 @@ PostgreSQL documentation
</listitem>
</varlistentry>
<varlistentry id="app-initdb-data-checksums" xreflabel="data checksums">
<term><option>-k</option></term>
<term><option>--data-checksums</option></term>
<listitem>
<para>
Use checksums on data pages to help detect corruption by the
I/O system that would otherwise be silent. Enabling checksums
may incur a noticeable performance penalty. This option can only
be set during initialization, and cannot be changed later. If
set, checksums are calculated for all objects, in all databases.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--locale=<replaceable>locale</replaceable></option></term>
<listitem>
......
......@@ -49,7 +49,7 @@ killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr)
/* page unchanged, so all is simple */
offset = ItemPointerGetOffsetNumber(iptr);
ItemIdMarkDead(PageGetItemId(p, offset));
SetBufferCommitInfoNeedsSave(so->curbuf);
MarkBufferDirtyHint(so->curbuf);
}
else
{
......@@ -63,7 +63,7 @@ killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr)
{
/* found */
ItemIdMarkDead(PageGetItemId(p, offset));
SetBufferCommitInfoNeedsSave(so->curbuf);
MarkBufferDirtyHint(so->curbuf);
break;
}
}
......
......@@ -207,11 +207,9 @@ hashgettuple(PG_FUNCTION_ARGS)
ItemIdMarkDead(PageGetItemId(page, offnum));
/*
* Since this can be redone later if needed, it's treated the same
* as a commit-hint-bit status update for heap tuples: we mark the
* buffer dirty but don't make a WAL log entry.
* Since this can be redone later if needed, mark as a hint.
*/
SetBufferCommitInfoNeedsSave(so->hashso_curbuf);
MarkBufferDirtyHint(so->hashso_curbuf);
}
/*
......
......@@ -3288,7 +3288,6 @@ l2:
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
PageSetLSN(BufferGetPage(buffer), recptr);
PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
}
END_CRIT_SECTION();
......
......@@ -289,7 +289,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
{
((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
PageClearFull(page);
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
}
}
......
......@@ -266,6 +266,8 @@ end_heap_rewrite(RewriteState state)
/* Write the last page, if any */
if (state->rs_buffer_valid)
{
PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
if (state->rs_use_wal)
log_newpage_rel(state->rs_new_rel,state->rs_blockno, state->rs_buffer);
......@@ -601,6 +603,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
{
/* Doesn't fit, so write out the existing page */
PageSetChecksumInplace(page, state->rs_blockno);
/* XLOG stuff */
if (state->rs_use_wal)
log_newpage_rel(state->rs_new_rel, state->rs_blockno, page);
......
......@@ -441,9 +441,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
opaque->btpo_flags |= BTP_HAS_GARBAGE;
/* be sure to mark the proper buffer dirty... */
if (nbuf != InvalidBuffer)
SetBufferCommitInfoNeedsSave(nbuf);
MarkBufferDirtyHint(nbuf);
else
SetBufferCommitInfoNeedsSave(buf);
MarkBufferDirtyHint(buf);
}
}
}
......
......@@ -1245,7 +1245,7 @@ restart:
opaque->btpo_cycleid == vstate->cycleid)
{
opaque->btpo_cycleid = 0;
SetBufferCommitInfoNeedsSave(buf);
MarkBufferDirtyHint(buf);
}
}
......
......@@ -309,6 +309,7 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
// UNDONE: Unfortunately, I think we write temp relations to the mirror...
LWLockAcquire(MirroredLock, LW_SHARED);
/* don't set checksum for all-zero page */
smgrextend(wstate->index->rd_smgr, wstate->btws_pages_written++,
(char *) wstate->btws_zeropage,
true);
......@@ -321,6 +322,7 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
// -------- MirroredLock ----------
// UNDONE: Unfortunately, I think we write temp relations to the mirror...
LWLockAcquire(MirroredLock, LW_SHARED);
PageSetChecksumInplace(page, blkno);
/*
* Now write the page. We say isTemp = true even if it's not a temp
......
......@@ -1169,9 +1169,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
}
/*
* Since this can be redone later if needed, it's treated the same as a
* commit-hint-bit status update for heap tuples: we mark the buffer dirty
* but don't make a WAL log entry.
* Since this can be redone later if needed, mark as dirty hint.
*
* Whenever we mark anything LP_DEAD, we also set the page's
* BTP_HAS_GARBAGE flag, which is likewise just a hint.
......@@ -1179,7 +1177,7 @@ _bt_killitems(IndexScanDesc scan, bool haveLock)
if (killedsomething)
{
opaque->btpo_flags |= BTP_HAS_GARBAGE;
SetBufferCommitInfoNeedsSave(so->currPos.buf);
MarkBufferDirtyHint(so->currPos.buf);
}
if (!haveLock)
......
......@@ -437,6 +437,8 @@ critical section.)
4. Mark the shared buffer(s) as dirty with MarkBufferDirty(). (This must
happen before the WAL record is inserted; see notes in SyncOneBuffer().)
Note that marking a buffer dirty with MarkBufferDirty() should only
happen iff you write a WAL record; see Writing Hints below.
5. If the relation requires WAL-logging, build a WAL log record and pass it
to XLogInsert(); then update the page's LSN using the returned XLOG
......@@ -542,6 +544,29 @@ replay code has to do the insertion on its own to restore the index to
consistency. Such insertions occur after WAL is operational, so they can
and should write WAL records for the additional generated actions.
Writing Hints
-------------
In some cases, we write additional information to data blocks without
writing a preceding WAL record. This should only happen iff the data can
be reconstructed later following a crash and the action is simply a way
of optimising for performance. When a hint is written we use
MarkBufferDirtyHint() to mark the block dirty.
If the buffer is clean and checksums are in use then
MarkBufferDirtyHint() inserts an XLOG_HINT record to ensure that we
take a full page image that includes the hint. We do this to avoid
a partial page write, when we write the dirtied page. WAL is not
written during recovery, so we simply skip dirtying blocks because
of hints when in recovery.
If you do decide to optimise away a WAL record, then any calls to
MarkBufferDirty() must be replaced by MarkBufferDirtyHint(),
otherwise you will expose the risk of partial page writes.
In GPDB, gp_disable_tuple_hints GUC dictates whether a buffer is marked dirty
by a hint bit change. If the GUC is on, hint bit changes do not mark a buffer
dirty.
Asynchronous Commit
-------------------
......
......@@ -230,6 +230,8 @@ static uint32 ProcLastRecTotalLen = 0;
static uint32 ProcLastRecDataLen = 0;
static XLogRecPtr InvalidXLogRecPtr = {0, 0};
/*
* RedoRecPtr is this backend's local copy of the REDO record pointer
* (which is almost but not quite the same as a pointer to the most recent
......@@ -839,7 +841,7 @@ XLogInsert_Internal(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId h
bool updrqst;
bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
bool isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT);
bool rdata_iscopy = false;
/* Safety check in case our assumption is ever broken. */
......@@ -1146,6 +1148,18 @@ begin:;
if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)
info |= XLR_BKP_REMOVABLE;
/*
* If this is a hint record and we don't need a backup block then
* we have no more work to do and can exit quickly without inserting
* a WAL record at all. In that case return InvalidXLogRecPtr.
*/
if (isHint && !(info & XLR_BKP_BLOCK_MASK))
{
LWLockRelease(WALInsertLock);
END_CRIT_SECTION();
return InvalidXLogRecPtr;
}
/*
* If there isn't enough space on the current XLOG page for a record
* header, advance to the next page (leaving the unused space as zeroes).
......@@ -1531,10 +1545,10 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
* to XLogInsert, whether it otherwise has the standard page layout or
* not.
*/
*lsn = page->pd_lsn;
*lsn = BufferGetLSNAtomic(rdata->buffer);
if (doPageWrites &&
XLByteLE(page->pd_lsn, RedoRecPtr))
XLByteLE(*lsn, RedoRecPtr))
{
/*
* The page needs to be backed up, so set up *bkpb
......@@ -5426,16 +5440,6 @@ GetSystemIdentifier(void)
return ControlFile->system_identifier;
}
/*
* Are checksums enabled for data pages?
*/
bool
DataChecksumsEnabled(void)
{
Assert(ControlFile != NULL);
return (ControlFile->data_checksum_version > 0);
}
/*
* Initialization of shared memory for XLOG
*/
......@@ -5554,6 +5558,16 @@ XLogStartupInit(void)
ReadControlFile();
}
/*
* Are checksums enabled for data pages?
*/
bool
DataChecksumsEnabled(void)
{
Assert(ControlFile != NULL);
return (ControlFile->data_checksum_version > 0);
}
/*
* This func must be called ONCE on system install. It creates pg_control
* and the initial XLOG segment.
......@@ -9512,6 +9526,51 @@ RequestXLogSwitch(void)
return RecPtr;
}
/*
* Write a backup block if needed when we are setting a hint. Note that
* this may be called for a variety of page types, not just heaps.
*
* Deciding the "if needed" part is delicate and requires us to either
* grab WALInsertLock or check the info_lck spinlock. If we check the
* spinlock and it says Yes then we will need to get WALInsertLock as well,
* so the design choice here is to just go straight for the WALInsertLock
* and trust that calls to this function are minimised elsewhere.
*
* Callable while holding just share lock on the buffer content.
*
* Possible that multiple concurrent backends could attempt to write
* WAL records. In that case, more than one backup block may be recorded
* though that isn't important to the outcome and the backup blocks are
* likely to be identical anyway.
*/
#define XLOG_HINT_WATERMARK 13579
XLogRecPtr
XLogSaveBufferForHint(Buffer buffer)
{
/*
* Make an XLOG entry reporting the hint
*/
XLogRecData rdata[2];
int watermark = XLOG_HINT_WATERMARK;
/*
* Not allowed to have zero-length records, so use a small watermark
*/
rdata[0].data = (char *) (&watermark);
rdata[0].len = sizeof(int);
rdata[0].buffer = InvalidBuffer;
rdata[0].buffer_std = false;
rdata[0].next = &(rdata[1]);
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer = buffer;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
return XLogInsert(RM_XLOG_ID, XLOG_HINT, rdata);
}
/*
* XLOG resource manager's routines
*
......@@ -9523,6 +9582,9 @@ xlog_redo(XLogRecPtr beginLoc __attribute__((unused)), XLogRecPtr lsn __attribut
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
/* Backup blocks are not used in most xlog records */
Assert(info == XLOG_HINT || !(record->xl_info & XLR_BKP_BLOCK_MASK));
if (info == XLOG_NEXTOID)
{
Oid nextOid;
......@@ -9649,6 +9711,34 @@ xlog_redo(XLogRecPtr beginLoc __attribute__((unused)), XLogRecPtr lsn __attribut
{
/* nothing to do here */
}
else if (info == XLOG_HINT)
{
#ifdef USE_ASSERT_CHECKING
int *watermark = (int *) XLogRecGetData(record);
#endif
/* Check the watermark is correct for the hint record */
Assert(*watermark == XLOG_HINT_WATERMARK);
/* Backup blocks must be present for smgr hint records */
Assert(record->xl_info & XLR_BKP_BLOCK_MASK);
/*
* Hint records have no information that needs to be replayed.
* The sole purpose of them is to ensure that a hint bit does
* not cause a checksum invalidation if a hint bit write should
* cause a torn page. So the body of the record is empty but
* there must be one backup block.
*
* Since the only change in the backup block is a hint bit,
* there is no conflict with Hot Standby.
*
* This also means there is no corresponding API call for this,
* so an smgr implementation has no need to implement anything.
* Which means nothing is needed in md.c etc
*/
RestoreBkpBlocks(record, lsn);
}
else if (info == XLOG_BACKUP_END)
{
XLogRecPtr startpoint;
......
......@@ -1486,7 +1486,7 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
SetBufferCommitInfoNeedsSave(*buf);
MarkBufferDirtyHint(*buf);
}
seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
......
......@@ -10272,6 +10272,8 @@ copy_buffer_pool_data(Relation rel, SMgrRelation dst,
smgrread(src, blkno, buf);
PageSetChecksumInplace(page, blkno);
/* XLOG stuff */
if (useWal)
{
......
......@@ -86,6 +86,7 @@ static bool IsForInput;
/* local state for LockBufferForCleanup */
static volatile BufferDesc *PinCountWaitBuf = NULL;
static XLogRecPtr InvalidXLogRecPtr = {0, 0};
static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf,
bool isTemp, BlockNumber blockNum, bool zeroPage,
......@@ -373,6 +374,7 @@ ReadBuffer_common(SMgrRelation reln,
{
/* new buffers are zero-filled */
MemSet((char *) bufBlock, 0, BLCKSZ);
/* don't set checksum for all-zero page */
smgrextend(reln, blockNum, (char *) bufBlock,
isTemp);
}
......@@ -387,20 +389,20 @@ ReadBuffer_common(SMgrRelation reln,
else
smgrread(reln, blockNum, (char *) bufBlock);
/* check for garbage data */
if (!PageHeaderIsValid((PageHeader) bufBlock))
if (!PageIsVerified((Page) bufBlock, blockNum))
{
if (zero_damaged_pages)
{
ereport(WARNING,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page header in block %u of relation %s; zeroing out page",
errmsg("invalid page in block %u of relation %s; zeroing out page",
blockNum, relpath(reln->smgr_rnode))));
MemSet((char *) bufBlock, 0, BLCKSZ);
}
else
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("invalid page header in block %u of relation %s",
errmsg("invalid page in block %u of relation %s",
blockNum, relpath(reln->smgr_rnode)),
errSendAlert(true)));
}
......@@ -579,14 +581,23 @@ BufferAlloc(SMgrRelation smgr,
* victim. We need lock to inspect the page LSN, so this
* can't be done inside StrategyGetBuffer.
*/
if (strategy != NULL &&
XLogNeedsFlush(BufferGetLSN(buf)) &&
StrategyRejectBuffer(strategy, buf))
if (strategy != NULL)
{
/* Drop lock/pin and loop around for another buffer */
ReleaseContentLock(buf);
UnpinBuffer(buf, true);
continue;
XLogRecPtr lsn;
/* Read the LSN while holding buffer header lock */
LockBufHdr(buf);
lsn = BufferGetLSN(buf);
UnlockBufHdr(buf);
if (XLogNeedsFlush(lsn) &&
StrategyRejectBuffer(strategy, buf))
{
/* Drop lock/pin and loop around for another buffer */
LWLockRelease(buf->content_lock);
UnpinBuffer(buf, true);
continue;
}
}
/* OK, do the I/O */
......@@ -1924,6 +1935,8 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
XLogRecPtr recptr;
ErrorContextCallback errcontext;
XLogRecPtr GistXLogRecPtrForTemp = {1, 1}; /* Magic GIST value */
Block bufBlock;
char *bufToWrite;
/*
* Acquire the buffer's io_in_progress lock. If StartBufferIO returns
......@@ -1943,12 +1956,23 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
if (reln == NULL)
reln = smgropen(buf->tag.rnode);
LockBufHdr(buf);
/*
* Run PageGetLSN while holding header lock, since we don't have the
* buffer locked exclusively in all cases.
*/
recptr = BufferGetLSN(buf);
/* To check if block content changes while flushing. - vadim 01/17/97 */
buf->flags &= ~BM_JUST_DIRTIED;
UnlockBufHdr(buf);
/*
* Force XLOG flush up to buffer's LSN. This implements the basic WAL
* rule that log updates must hit disk before any of the data-file changes
* they describe do.
*/
recptr = BufferGetLSN(buf);
if (recptr.xlogid != GistXLogRecPtrForTemp.xlogid ||
recptr.xrecoff != GistXLogRecPtrForTemp.xrecoff)
{
......@@ -1961,14 +1985,16 @@ FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
* we have the io_in_progress lock.
*/
/* To check if block content changes while flushing. - vadim 01/17/97 */
LockBufHdr(buf);
buf->flags &= ~BM_JUST_DIRTIED;
UnlockBufHdr(buf);
bufBlock = BufHdrGetBlock(buf);
bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
/*
* bufToWrite is either the shared buffer or a copy, as appropriate.
*/
smgrwrite(reln,
buf->tag.blockNum,
(char *) BufHdrGetBlock(buf),
bufToWrite,
false);
BufferFlushCount++;
......@@ -2079,6 +2105,34 @@ RelationTruncate(Relation rel, BlockNumber nblocks, bool markPersistentAsPhysica
}
}
/*
* BufferGetLSNAtomic
* Retrieves the LSN of the buffer atomically using a buffer header lock.
* This is necessary for some callers who may not have an exclusive lock
* on the buffer.
*/
XLogRecPtr
BufferGetLSNAtomic(Buffer buffer)
{
volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
char *page = BufferGetPage(buffer);
XLogRecPtr lsn;
/* Local buffers don't need a lock. */
if (BufferIsLocal(buffer))
return PageGetLSN(page);
/* Make sure we've got a real buffer, and that we hold a pin on it. */
Assert(BufferIsValid(buffer));
Assert(BufferIsPinned(buffer));
LockBufHdr(bufHdr);
lsn = PageGetLSN(page);
UnlockBufHdr(bufHdr);
return lsn;
}
/* ---------------------------------------------------------------------
* DropRelFileNodeBuffers
*
......@@ -2259,6 +2313,9 @@ FlushRelationBuffers(Relation rel)
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
{
ErrorContextCallback errcontext;
Page localpage;
localpage = (char *) LocalBufHdrGetBlock(bufHdr);
MIRROREDLOCK_BUFMGR_DECLARE;
......@@ -2271,10 +2328,12 @@ FlushRelationBuffers(Relation rel)
// -------- MirroredLock ----------
// UNDONE: Unfortunately, I think we write temp relations to the mirror...
MIRROREDLOCK_BUFMGR_LOCK;
PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
smgrwrite(rel->rd_smgr,
bufHdr->tag.blockNum,
(char *) LocalBufHdrGetBlock(bufHdr),
localpage,
rel->rd_istemp);
MIRROREDLOCK_BUFMGR_UNLOCK;
......@@ -2426,26 +2485,29 @@ IncrBufferRefCount(Buffer buffer)
PrivateRefCount[buffer - 1]++;
}
/*
* SetBufferCommitInfoNeedsSave
* MarkBufferDirtyHint
*
* Mark a buffer dirty when we have updated tuple commit-status bits in it.
* Mark a buffer dirty for non-critical changes.
*
* This is essentially the same as MarkBufferDirty, except that the caller
* might have only share-lock instead of exclusive-lock on the buffer's
* content lock. We preserve the distinction mainly as a way of documenting
* that the caller has not made a critical data change --- the status-bit
* update could be redone by someone else just as easily. Therefore, no WAL
* log record need be generated, whereas calls to MarkBufferDirty really ought
* to be associated with a WAL-entry-creating action.
* This is essentially the same as MarkBufferDirty, except:
*
* 1. The caller does not write WAL; so if checksums are enabled, we may need
* to write an XLOG_HINT WAL record to protect against torn pages.
* 2. The caller might have only share-lock instead of exclusive-lock on the
* buffer's content lock.
* 3. This function does not guarantee that the buffer is always marked dirty
* (due to a race condition), so it cannot be used for important changes.
*/
void
SetBufferCommitInfoNeedsSave(Buffer buffer)
MarkBufferDirtyHint(Buffer buffer)
{
volatile BufferDesc *bufHdr;
Page page = BufferGetPage(buffer);
if (!BufferIsValid(buffer))
elog(ERROR, "bad buffer id: %d", buffer);
elog(ERROR, "bad buffer ID: %d", buffer);
if (BufferIsLocal(buffer))
{
......@@ -2462,24 +2524,103 @@ SetBufferCommitInfoNeedsSave(Buffer buffer)
/*
* This routine might get called many times on the same page, if we are
* making the first scan after commit of an xact that added/deleted many
* tuples. So, be as quick as we can if the buffer is already dirty. We
* do this by not acquiring spinlock if it looks like the status bits are
* already OK. (Note it is okay if someone else clears BM_JUST_DIRTIED
* immediately after we look, because the buffer content update is already
* done and will be reflected in the I/O.)
* tuples. So, be as quick as we can if the buffer is already dirty. We do
* this by not acquiring spinlock if it looks like the status bits are
* already set. Since we make this test unlocked, there's a chance we
* might fail to notice that the flags have just been cleared, and failed
* to reset them, due to memory-ordering issues. But since this function
* is only intended to be used in cases where failing to write out the data
* would be harmless anyway, it doesn't really matter.
*/
if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
(BM_DIRTY | BM_JUST_DIRTIED))
{
XLogRecPtr lsn = InvalidXLogRecPtr;
bool dirtied = false;
#if 0
/*
* If checksums are enabled, and the buffer is permanent, then a full
* page image may be required even for some hint bit updates to protect
* against torn pages. This full page image is only necessary if the
* hint bit update is the first change to the page since the last
* checkpoint.
*
* We don't check full_page_writes here because that logic is
* included when we call XLogInsert() since the value changes
* dynamically.
*/
if (DataChecksumsEnabled())
{
/*
* If we're in recovery we cannot dirty a page because of a hint.
* We can set the hint, just not dirty the page as a result so
* the hint is lost when we evict the page or shutdown.
*
* See src/backend/storage/page/README for longer discussion.
*/
if (RecoveryInProgress())
return;
/*
* If the block is already dirty because we either made a change
* or set a hint already, then we don't need to write a full page
* image. Note that aggressive cleaning of blocks
* dirtied by hint bit setting would increase the call rate.
* Bulk setting of hint bits would reduce the call rate...
*
* We must issue the WAL record before we mark the buffer dirty.
* Otherwise we might write the page before we write the WAL.
* That causes a race condition, since a checkpoint might occur
* between writing the WAL record and marking the buffer dirty.
* We solve that with a kluge, but one that is already in use
* during transaction commit to prevent race conditions.
* Basically, we simply prevent the checkpoint WAL record from
* being written until we have marked the buffer dirty. We don't
* start the checkpoint flush until we have marked dirty, so our
* checkpoint must flush the change to disk successfully or the
* checkpoint never gets written, so crash recovery will fix.
*
* It's possible we may enter here without an xid, so it is
* essential that CreateCheckpoint waits for virtual transactions
* rather than full transactionids.
*/
lsn = XLogSaveBufferForHint(buffer);
}
#endif
LockBufHdr(bufHdr);
Assert(bufHdr->refcount > 0);
if (!(bufHdr->flags & BM_DIRTY) && VacuumCostActive)
VacuumCostBalance += VacuumCostPageDirty;
if (!(bufHdr->flags & BM_DIRTY))
{
dirtied = true; /* Means "will be dirtied by this action" */
/*
* Set the page LSN if we wrote a backup block. We aren't
* supposed to set this when only holding a share lock but
* as long as we serialise it somehow we're OK. We choose to
* set LSN while holding the buffer header lock, which causes
* any reader of an LSN who holds only a share lock to also
* obtain a buffer header lock before using PageGetLSN().
* Fortunately, thats not too many places.
*
* If checksums are enabled, you might think we should reset the
* checksum here. That will happen when the page is written
* sometime later in this checkpoint cycle.
*/
if (!XLogRecPtrIsInvalid(lsn))
PageSetLSN(page, lsn);
}
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
UnlockBufHdr(bufHdr);
if (dirtied)
{
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageDirty;
}
}
}
/*
* Release buffer content locks for shared buffers.
*
......
......@@ -157,7 +157,8 @@ LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
*/
if (bufHdr->flags & BM_DIRTY)
{
SMgrRelation oreln;
SMgrRelation oreln;
Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
/* Find smgr relation for buffer */
oreln = smgropen(bufHdr->tag.rnode);
......@@ -166,10 +167,12 @@ LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
// UNDONE: Unfortunately, I think we write temp relations to the mirror...
LWLockAcquire(MirroredLock, LW_SHARED);
PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
/* And write... */
smgrwrite(oreln,
bufHdr->tag.blockNum,
(char *) LocalBufHdrGetBlock(bufHdr),
localpage,
true);
LWLockRelease(MirroredLock);
......
......@@ -16,7 +16,15 @@
#include "access/htup.h"
#include "storage/bufpage.h"
#include "access/xlog.h"
#include "utils/pg_crc.h"
bool ignore_checksum_failure = false;
static char pageCopyData[BLCKSZ]; /* for checksum calculation */
static Page pageCopy = pageCopyData;
static uint16 PageCalcChecksum16(Page page, BlockNumber blkno);
/* ----------------------------------------------------------------
* Page support functions
......@@ -26,6 +34,8 @@
/*
* PageInit
* Initializes the contents of a page.
* Note that we don't calculate an initial checksum here; that's not done
* until it's time to write.
*/
void
PageInit(Page page, Size pageSize, Size specialSize)
......@@ -40,7 +50,7 @@ PageInit(Page page, Size pageSize, Size specialSize)
/* Make sure all fields of page are zero, as well as unused space */
MemSet(p, 0, pageSize);
/* p->pd_flags = 0; done by above MemSet */
p->pd_flags = 0;
p->pd_lower = SizeOfPageHeaderData;
p->pd_upper = pageSize - specialSize;
p->pd_special = pageSize - specialSize;
......@@ -50,8 +60,8 @@ PageInit(Page page, Size pageSize, Size specialSize)
/*
* PageHeaderIsValid
* Check that the header fields of a page appear valid.
* PageIsVerified
* Check that the page header and checksum (if any) appear valid.
*
* This is called when a page has just been read in from disk. The idea is
* to cheaply detect trashed pages before we go nuts following bogus item
......@@ -68,30 +78,77 @@ PageInit(Page page, Size pageSize, Size specialSize)
* will clean up such a page and make it usable.
*/
bool
PageHeaderIsValid(PageHeader page)
PageIsVerified(Page page, BlockNumber blkno)
{
PageHeader p = (PageHeader) page;
char *pagebytes;
int i;
bool checksum_failure = false;
bool header_sane = false;
bool all_zeroes = false;
uint16 checksum;
/* Check normal case */
if (PageGetPageSize(page) == BLCKSZ &&
PageGetPageLayoutVersion(page) == PG_PAGE_LAYOUT_VERSION &&
(page->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
page->pd_lower >= SizeOfPageHeaderData &&
page->pd_lower <= page->pd_upper &&
page->pd_upper <= page->pd_special &&
page->pd_special <= BLCKSZ &&
page->pd_special == MAXALIGN(page->pd_special))
return true;
/*
* Don't verify page data unless the page passes basic non-zero test
*/
if (!PageIsNew(page))
{
if (DataChecksumsEnabled())
{
checksum = PageCalcChecksum16(page, blkno);
if (checksum != p->pd_checksum)
checksum_failure = true;
}
/*
* The following checks don't prove the header is correct,
* only that it looks sane enough to allow into the buffer pool.
* Later usage of the block can still reveal problems,
* which is why we offer the checksum option.
*/
if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
p->pd_lower <= p->pd_upper &&
p->pd_upper <= p->pd_special &&
p->pd_special <= BLCKSZ &&
p->pd_special == MAXALIGN(p->pd_special))
header_sane = true;
if (header_sane && !checksum_failure)
return true;
}
/* Check all-zeroes case */
all_zeroes = true;
pagebytes = (char *) page;
for (i = 0; i < BLCKSZ; i++)
{
if (pagebytes[i] != 0)
return false;
{
all_zeroes = false;
break;
}
}
if (all_zeroes)
return true;
/*
* Throw a WARNING if the checksum fails, but only after we've checked for
* the all-zeroes case.
*/
if (checksum_failure)
{
ereport(WARNING,
(ERRCODE_DATA_CORRUPTED,
errmsg("page verification failed, calculated checksum %u but expected %u",
checksum, p->pd_checksum)));
if (header_sane && ignore_checksum_failure)
return true;
}
return true;
return false;
}
......@@ -808,3 +865,98 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
pfree(itemidbase);
}
/*
* Set checksum for page in shared buffers.
*
* If checksums are disabled, or if the page is not initialized, just return
* the input. Otherwise, we must make a copy of the page before calculating the
* checksum, to prevent concurrent modifications (e.g. setting hint bits) from
* making the final checksum invalid.
*
* Returns a pointer to the block-sized data that needs to be written. Uses
* statically-allocated memory, so the caller must immediately write the
* returned page and not refer to it again.
*/
char *
PageSetChecksumCopy(Page page, BlockNumber blkno)
{
if (PageIsNew(page) || !DataChecksumsEnabled())
return (char *) page;
/*
* We make a copy iff we need to calculate a checksum because other
* backends may set hint bits on this page while we write, which
* would mean the checksum differs from the page contents. It doesn't
* matter if we include or exclude hints during the copy, as long
* as we write a valid page and associated checksum.
*/
memcpy((char *) pageCopy, (char *) page, BLCKSZ);
PageSetChecksumInplace(pageCopy, blkno);
return (char *) pageCopy;
}
/*
* Set checksum for page in private memory.
*
* This is a simpler version of PageSetChecksumCopy(). The more explicit API
* allows us to more easily see if we're making the correct call and reduces
* the amount of additional code specific to page verification.
*/
void
PageSetChecksumInplace(Page page, BlockNumber blkno)
{
if (PageIsNew(page))
return;
if (DataChecksumsEnabled())
{
PageHeader p = (PageHeader) page;
p->pd_checksum = PageCalcChecksum16(page, blkno);
}
return;
}
/*
* Calculate checksum for a PostgreSQL Page. This includes the block number (to
* detect the case when a page is somehow moved to a different location), the
* page header (excluding the checksum itself), and the page data.
*
* Note that if the checksum validation fails we cannot tell the difference
* between a transposed block and failure from direct on-block corruption,
* though that is better than just ignoring transposed blocks altogether.
*/
static uint16
PageCalcChecksum16(Page page, BlockNumber blkno)
{
pg_crc32 crc;
PageHeader p = (PageHeader) page;
/* only calculate the checksum for properly-initialized pages */
Assert(!PageIsNew(page));
INIT_CRC32C(crc);
/*
* Initialize the checksum calculation with the block number. This helps
* catch corruption from whole blocks being transposed with other whole
* blocks.
*/
COMP_CRC32C(crc, &blkno, sizeof(blkno));
/*
* Now add in the LSN, which is always the first field on the page.
*/
COMP_CRC32C(crc, page, sizeof(p->pd_lsn));
/*
* Now add the rest of the page, skipping the pd_checksum field.
*/
COMP_CRC32C(crc, page + sizeof(p->pd_lsn) + sizeof(p->pd_checksum),
BLCKSZ - sizeof(p->pd_lsn) - sizeof(p->pd_checksum));
FIN_CRC32C(crc);
return (uint16) crc;
}
......@@ -113,6 +113,7 @@ extern int CommitDelay;
extern int CommitSiblings;
extern char *default_tablespace;
extern char *temp_tablespaces;
extern bool ignore_checksum_failure;
extern bool synchronize_seqscans;
extern bool fullPageWrites;
extern int ssl_renegotiation_limit;
......@@ -621,6 +622,21 @@ static struct config_bool ConfigureNamesBool[] =
&XactSyncCommit,
true, NULL, NULL
},
{
{"ignore_checksum_failure", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Continues processing after a checksum failure."),
gettext_noop("Detection of a checksum failure normally causes PostgreSQL to "
"report an error, aborting the current transaction. Setting "
"ignore_checksum_failure to true causes the system to ignore the failure "
"(but still report a warning), and continue processing. This "
"behavior could cause crashes or other serious problems. Only "
"has an effect if checksums are enabled."),
GUC_NOT_IN_SAMPLE
},
&ignore_checksum_failure,
false,
NULL, NULL, NULL
},
{
{"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Continues processing past damaged page headers."),
......
......@@ -6,7 +6,7 @@
* NOTE: all the HeapTupleSatisfies routines will update the tuple's
* "hint" status bits if we see that the inserting or deleting transaction
* has now committed or aborted (and it is safe to set the hint bits).
* If the hint bits are changed, SetBufferCommitInfoNeedsSave is called on
* If the hint bits are changed, MarkBufferDirtyHint is called on
* the passed-in buffer. The caller must hold not only a pin, but at least
* shared buffer content lock on the buffer containing the tuple.
*
......@@ -131,7 +131,7 @@ markDirty(Buffer buffer, Relation relation, HeapTupleHeader tuple, bool isXmin)
if (!gp_disable_tuple_hints)
{
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
return;
}
......@@ -141,14 +141,14 @@ markDirty(Buffer buffer, Relation relation, HeapTupleHeader tuple, bool isXmin)
*/
if (relation == NULL)
{
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
return;
}
if (relation->rd_issyscat)
{
/* Assume we want to always mark the buffer dirty */
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
return;
}
......@@ -162,7 +162,7 @@ markDirty(Buffer buffer, Relation relation, HeapTupleHeader tuple, bool isXmin)
if (xid == InvalidTransactionId)
{
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
return;
}
......@@ -171,7 +171,7 @@ markDirty(Buffer buffer, Relation relation, HeapTupleHeader tuple, bool isXmin)
*/
if (CLOGTransactionIsOld(xid))
{
SetBufferCommitInfoNeedsSave(buffer);
MarkBufferDirtyHint(buffer);
return;
}
}
......
......@@ -107,6 +107,7 @@ static char *backend_output = DEVNULL;
*/
static bool forMirrorOnly = false;
static bool show_setting = false;
static bool data_checksums = false;
static char *xlog_dir = "";
/**
......@@ -1546,8 +1547,10 @@ bootstrap_template1(char *short_version)
unsetenv("PGCLIENTENCODING");
snprintf(cmd, sizeof(cmd),
"\"%s\" --boot -x1 %s -c gp_before_persistence_work=on %s",
backend_exec, boot_options, talkargs);
"\"%s\" --boot -x1 %s %s -c gp_before_persistence_work=on %s",
backend_exec,
data_checksums ? "-k" : "",
boot_options, talkargs);
PG_CMD_OPEN;
......@@ -2814,6 +2817,7 @@ usage(const char *progname)
printf(_("\nLess commonly used options:\n"));
printf(_(" -d, --debug generate lots of debugging output\n"));
printf(_(" -s, --show show internal settings\n"));
printf(_(" -k, --data-checksums data page checksums\n"));
printf(_(" -L DIRECTORY where to find the input files\n"));
printf(_(" -n, --noclean do not clean up after errors\n"));
printf(_(" -m, --formirror only create data needed to start the backend in mirror mode\n"));
......@@ -2856,6 +2860,7 @@ main(int argc, char *argv[])
{"show", no_argument, NULL, 's'},
{"noclean", no_argument, NULL, 'n'},
{"xlogdir", required_argument, NULL, 'X'},
{"data-checksums", no_argument, NULL, 'k'},
{NULL, 0, NULL, 0}
};
......@@ -2919,7 +2924,7 @@ main(int argc, char *argv[])
/* process command-line options */
while ((c = getopt_long(argc, argv, "dD:E:L:mnU:WA:sT:X:", long_options, &option_index)) != -1)
while ((c = getopt_long(argc, argv, "dD:E:kL:mnU:WA:sT:X:", long_options, &option_index)) != -1)
{
const char *optname;
char shortopt[2];
......@@ -2968,6 +2973,9 @@ main(int argc, char *argv[])
noclean = true;
printf(_("Running in noclean mode. Mistakes will not be cleaned up.\n"));
break;
case 'k':
data_checksums = true;
break;
case 'L':
share_path = xstrdup(optarg);
break;
......@@ -3413,6 +3421,11 @@ main(int argc, char *argv[])
printf(_("The default text search configuration will be set to \"%s\".\n"),
default_text_search_config);
if (data_checksums)
printf(_("Data page checksums are enabled.\n"));
else
printf(_("Data page checksums are disabled.\n"));
printf("\n");
umask(077);
......
......@@ -232,6 +232,5 @@ main(int argc, char *argv[])
ControlFile.lc_ctype);
printf(_("Data page checksum version: %u\n"),
ControlFile.data_checksum_version);
return 0;
}
......@@ -246,6 +246,7 @@ extern void XLogFileRepFlushCache(
XLogRecPtr *lastChangeTrackingEndLoc);
extern void XLogGetLastRemoved(uint32 *log, uint32 *seg);
extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer);
extern void xlog_redo(XLogRecPtr beginLoc __attribute__((unused)), XLogRecPtr lsn __attribute__((unused)), XLogRecord *record);
extern void xlog_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record);
......
......@@ -58,6 +58,7 @@ typedef struct CheckPoint
#define XLOG_SWITCH 0x40
#define XLOG_BACKUP_END 0x50
#define XLOG_NEXTRELFILENODE 0x60
#define XLOG_HINT 0x70
/* System status indicator */
......
......@@ -318,6 +318,7 @@ extern void FlushDatabaseBuffers(Oid dbid);
extern void DropRelFileNodeBuffers(RelFileNode rnode, bool istemp,
BlockNumber firstDelBlock);
extern void DropDatabaseBuffers(Oid tbpoid, Oid dbid);
extern XLogRecPtr BufferGetLSNAtomic(Buffer buffer);
#ifdef NOT_USED
extern void PrintPinnedBufs(void);
......@@ -325,7 +326,7 @@ extern void PrintPinnedBufs(void);
extern Size BufferShmemSize(void);
extern RelFileNode BufferGetFileNode(Buffer buffer);
extern void SetBufferCommitInfoNeedsSave(Buffer buffer);
extern void MarkBufferDirtyHint(Buffer buffer);
extern void UnlockBuffers(void);
extern void LockBuffer(Buffer buffer, int mode);
......
......@@ -15,6 +15,7 @@
#define BUFPAGE_H
#include "storage/bufmgr.h"
#include "storage/block.h"
#include "storage/item.h"
#include "storage/off.h"
#include "access/xlog.h"
......@@ -402,7 +403,7 @@ do { \
*/
extern void PageInit(Page page, Size pageSize, Size specialSize);
extern bool PageHeaderIsValid(PageHeader page);
extern bool PageIsVerified(Page page, BlockNumber blkno);
extern OffsetNumber PageAddItem(Page page, Item item, Size size,
OffsetNumber offsetNumber, bool overwrite, bool is_heap);
extern Page PageGetTempPage(Page page, Size specialSize);
......@@ -413,5 +414,7 @@ extern Size PageGetExactFreeSpace(Page page);
extern Size PageGetHeapFreeSpace(Page page);
extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
#endif /* BUFPAGE_H */
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册