提交 e8c990fd 编写于 作者: J Jimmy Yih

Validate the previous free TID in gp_persistent_relation_node.

Previously, previous free TID validation was done under the GUC
persistent_integrity_checks. This commit extracts the previous free TID
validation into another GUC validate_previous_free_tid and is enabled by
default. If the validation detects a corruption in the free TID list, we
will now switch to a new free TID list and leave the corrupted one detached
for cleanup during persistent table rebuild or during crash recovery.

Authors: Jimmy Yih and Abhijit Subramanya
上级 308f0c8c
......@@ -1256,15 +1256,93 @@ void PersistentStore_ReadTuple(
pfree(nulls);
}
static bool PersistentStore_GetFreeTuple(
/*
* Check if the free TID is valid. If not, the free list is corrupted and we
* pretend there are no free tuples to reset the free list. The corrupted free
* list will be detached and cleaned during recovery or pt rebuild.
*/
static bool PersistentStore_ValidateFreeTID(
PersistentStoreData *storeData,
PersistentStoreSharedData *storeSharedData,
ItemPointer freeTid)
ItemPointer previousFreeTid)
{
Datum *values;
HeapTuple tupleCopy;
int64 persistentSerialNum;
bool tidIsValid = true;
if (storeSharedData->maxFreeOrderNum <= 0)
return true; /* No tuple to check */
values = (Datum*)palloc(storeData->numAttributes * sizeof(Datum));
PersistentStore_ReadTuple(
storeData,
storeSharedData,
&storeSharedData->freeTid,
values,
&tupleCopy);
PersistentStore_ExtractOurTupleData(
storeData,
values,
&persistentSerialNum,
previousFreeTid);
if (storeSharedData->maxFreeOrderNum == 1 && ItemPointerCompare(previousFreeTid, &storeSharedData->freeTid) != 0)
{
tidIsValid = false;
ereport(WARNING,
(errmsg("integrity check for PT freeTid failed"),
errdetail("expected to have previous FreeTID at %s equal to itself (found %s, %s)",
ItemPointerToString(&storeSharedData->freeTid),
ItemPointerToString2(previousFreeTid),
storeData->tableName)));
}
if (PersistentStore_IsZeroTid(previousFreeTid))
{
tidIsValid = false;
ereport(WARNING,
(errmsg("integrity check for PT freeTid failed"),
errdetail("expected to have previous FreeTID at %s to be free (found %s, %s)",
ItemPointerToString(&storeSharedData->freeTid),
ItemPointerToString2(previousFreeTid),
storeData->tableName)));
}
if (persistentSerialNum != storeSharedData->maxFreeOrderNum)
{
tidIsValid = false;
ereport(WARNING,
(errmsg("integrity check for PT freeTid failed"),
errdetail("expected persistent store tuple at %s to have order number " INT64_FORMAT " (found " INT64_FORMAT ", '%s')",
ItemPointerToString(&storeSharedData->freeTid),
storeSharedData->maxFreeOrderNum,
persistentSerialNum,
storeData->tableName)));
}
pfree(values);
heap_freetuple(tupleCopy);
/* If the free TID is not valid, switch to a new free list here */
if (!tidIsValid)
{
ItemPointerSet(previousFreeTid, 0, 0);
storeSharedData->maxFreeOrderNum = 0;
MemSet(&storeSharedData->freeTid, 0, sizeof(ItemPointerData));
ereport(WARNING,
(errmsg("switched to new free TID list")));
}
return tidIsValid;
}
static bool PersistentStore_GetFreeTuple(
PersistentStoreData *storeData,
PersistentStoreSharedData *storeSharedData,
ItemPointer freeTid)
{
ItemPointerData previousFreeTid;
MemSet(freeTid, 0, sizeof(ItemPointerData));
......@@ -1278,7 +1356,7 @@ static bool PersistentStore_GetFreeTuple(
if (storeSharedData->maxFreeOrderNum == 0)
{
return false; // No free tuples.
return false; /* No free tuples. */
}
if (gp_persistent_skip_free_list)
......@@ -1287,50 +1365,21 @@ static bool PersistentStore_GetFreeTuple(
elog(PersistentStore_DebugPrintLevel(),
"PersistentStore_GetFreeTuple: Skipping because gp_persistent_skip_free_list GUC is ON ('%s')",
storeData->tableName);
return false; // Pretend no free tuples.
return false; /* Pretend no free tuples. */
}
Assert(storeSharedData->freeTid.ip_posid != 0);
/*
* Read the current last free tuple.
*/
values = (Datum*)palloc(storeData->numAttributes * sizeof(Datum));
PersistentStore_ReadTuple(
storeData,
storeSharedData,
&storeSharedData->freeTid,
values,
&tupleCopy);
PersistentStore_ExtractOurTupleData(
if (!PersistentStore_ValidateFreeTID(
storeData,
values,
&persistentSerialNum,
&previousFreeTid);
if (PersistentStore_IsZeroTid(&previousFreeTid))
elog(ERROR, "Expected persistent store tuple at %s to be free ('%s')",
ItemPointerToString(&storeSharedData->freeTid),
storeData->tableName);
if (storeSharedData->maxFreeOrderNum == 1)
Insist(ItemPointerCompare(&previousFreeTid, &storeSharedData->freeTid) == 0);
if (persistentSerialNum != storeSharedData->maxFreeOrderNum)
elog(ERROR, "Expected persistent store tuple at %s to have order number " INT64_FORMAT " (found " INT64_FORMAT ", '%s')",
ItemPointerToString(&storeSharedData->freeTid),
storeSharedData->maxFreeOrderNum,
persistentSerialNum,
storeData->tableName);
storeSharedData,
&previousFreeTid))
return false;
*freeTid = storeSharedData->freeTid;
storeSharedData->maxFreeOrderNum--;
storeSharedData->freeTid = previousFreeTid;
pfree(values);
heap_freetuple(tupleCopy);
ItemPointerCopy(&previousFreeTid /* previousFreeTid set inside the ValidateFreeTID function */,
&storeSharedData->freeTid);
if (Debug_persistent_store_print)
elog(PersistentStore_DebugPrintLevel(),
......@@ -1339,46 +1388,12 @@ static bool PersistentStore_GetFreeTuple(
ItemPointerToString(&storeSharedData->freeTid),
storeData->tableName);
/*
* Lets validate and have sanity check to make sure the prevFreeTid is really free
*/
if (persistent_integrity_checks && (storeSharedData->maxFreeOrderNum > 0))
{
ItemPointerData tmpPreviousFreeTid;
/*
* Read the current last free tuple.
*/
values = (Datum*)palloc(storeData->numAttributes * sizeof(Datum));
PersistentStore_ReadTuple(
storeData,
storeSharedData,
&previousFreeTid,
values,
&tupleCopy);
PersistentStore_ExtractOurTupleData(
storeData,
values,
&persistentSerialNum,
&tmpPreviousFreeTid);
if (PersistentStore_IsZeroTid(&tmpPreviousFreeTid))
elog(ERROR, "PersistentStore_GetFreeTuple: Expected to have previous FreeTID at %s to be free ('%s')",
ItemPointerToString(&previousFreeTid),
storeData->tableName);
if (persistentSerialNum != storeSharedData->maxFreeOrderNum)
elog(ERROR, "PersistentStore_GetFreeTuple: Expected persistent store tuple at %s to have order number " INT64_FORMAT " (found " INT64_FORMAT ", '%s')",
ItemPointerToString(&previousFreeTid),
storeSharedData->maxFreeOrderNum,
persistentSerialNum,
storeData->tableName);
pfree(values);
heap_freetuple(tupleCopy);
}
if (validate_previous_free_tid &&
!PersistentStore_ValidateFreeTID(
storeData,
storeSharedData,
&previousFreeTid))
return false;
return true;
}
......@@ -1490,58 +1505,21 @@ void PersistentStore_FreeTuple(
persistentRel = (*storeData->openRel)();
storeSharedData->maxFreeOrderNum++;
if (storeSharedData->maxFreeOrderNum == 1)
prevFreeTid = storeSharedData->freeTid;
if (validate_previous_free_tid)
{
prevFreeTid = *persistentTid; // So non-zero PreviousFreeTid indicates free.
/* Let us validate and have sanity check to make sure the prevFreeTid is really free. */
ItemPointerData tmpPrevFreeTid;
PersistentStore_ValidateFreeTID(
storeData,
storeSharedData,
&tmpPrevFreeTid);
}
else
{
prevFreeTid = storeSharedData->freeTid;
/*
* Lets validate and have sanity check to make sure the prevFreeTid is really free
*/
if (persistent_integrity_checks)
{
ItemPointerData tmpPreviousFreeTid;
Datum *values;
HeapTuple tupleCopy;
int64 persistentSerialNum;
/*
* Read the current last free tuple.
*/
values = (Datum*)palloc(storeData->numAttributes * sizeof(Datum));
PersistentStore_ReadTuple(
storeData,
storeSharedData,
&prevFreeTid,
values,
&tupleCopy);
PersistentStore_ExtractOurTupleData(
storeData,
values,
&persistentSerialNum,
&tmpPreviousFreeTid);
if (PersistentStore_IsZeroTid(&tmpPreviousFreeTid))
elog(ERROR, "PersistentStore_FreeTuple: Expected to have previous FreeTID at %s to be free ('%s')",
ItemPointerToString(&prevFreeTid),
storeData->tableName);
if (persistentSerialNum != (storeSharedData->maxFreeOrderNum - 1))
elog(ERROR, "PersistentStore_FreeTuple: Expected persistent store tuple at %s to have order number " INT64_FORMAT " (found " INT64_FORMAT ", '%s')",
ItemPointerToString(&prevFreeTid),
(storeSharedData->maxFreeOrderNum - 1),
persistentSerialNum,
storeData->tableName);
storeSharedData->maxFreeOrderNum++;
if (storeSharedData->maxFreeOrderNum == 1)
ItemPointerCopy(persistentTid, &prevFreeTid); /* So non-zero PreviousFreeTid indicates free. */
pfree(values);
heap_freetuple(tupleCopy);
}
}
storeSharedData->freeTid = *persistentTid;
PersistentStore_FormTupleSetOurs(
......
......@@ -201,6 +201,7 @@ bool Debug_persistent_store_print = false;
int Debug_persistent_store_print_level = LOG;
bool Debug_persistent_bootstrap_print = false;
bool persistent_integrity_checks = true;
bool validate_previous_free_tid = true;
bool disable_persistent_diagnostic_dump = false;
bool debug_persistent_ptcat_verification = false;
bool debug_print_persistent_checks = false;
......@@ -1831,6 +1832,16 @@ struct config_bool ConfigureNamesBool_gp[] =
false, NULL, NULL
},
{
{"validate_previous_free_tid", PGC_SUSET, UNGROUPED,
gettext_noop("When set checks that the previous free TID of the current free tuple is a valid free tuple."),
NULL,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE
},
&validate_previous_free_tid,
true, NULL, NULL
},
{
{"disable_persistent_diagnostic_dump", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("When set disables printing full PT on erros."),
......
......@@ -188,6 +188,7 @@ extern bool Disable_persistent_recovery_logging;
extern bool Debug_persistent_store_print;
extern bool Debug_persistent_bootstrap_print;
extern bool persistent_integrity_checks;
extern bool validate_previous_free_tid;
extern bool disable_persistent_diagnostic_dump;
extern bool debug_persistent_ptcat_verification;
extern bool debug_print_persistent_checks;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册