未验证 提交 bddba107 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20608 from taosdata/fix/TS-2960

fix(tdb/restore): rollback journal files backward
...@@ -687,6 +687,8 @@ typedef struct SSttBlockLoadInfo { ...@@ -687,6 +687,8 @@ typedef struct SSttBlockLoadInfo {
STSchema *pSchema; STSchema *pSchema;
int16_t *colIds; int16_t *colIds;
int32_t numOfCols; int32_t numOfCols;
bool checkRemainingRow;
bool isLast;
bool sttBlockLoaded; bool sttBlockLoaded;
int32_t numOfStt; int32_t numOfStt;
......
...@@ -590,6 +590,7 @@ typedef struct { ...@@ -590,6 +590,7 @@ typedef struct {
SDataFReader **pDataFReader; SDataFReader **pDataFReader;
TSDBROW row; TSDBROW row;
bool checkRemainingRow;
SMergeTree mergeTree; SMergeTree mergeTree;
SMergeTree *pMergeTree; SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
...@@ -600,7 +601,6 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa ...@@ -600,7 +601,6 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
int nCols) { int nCols) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
bool checkRemainingRow = true;
switch (state->state) { switch (state->state) {
case SFSLASTNEXTROW_FS: case SFSLASTNEXTROW_FS:
...@@ -633,12 +633,25 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa ...@@ -633,12 +633,25 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
if (code) goto _err; if (code) goto _err;
} }
state->pLoadInfo->colIds = aCols; for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
state->pLoadInfo->numOfCols = nCols; state->pLoadInfo[i].colIds = aCols;
state->pLoadInfo[i].numOfCols = nCols;
state->pLoadInfo[i].isLast = isLast;
}
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid, tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX}, &(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true); &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
state->pMergeTree = &state->mergeTree; state->pMergeTree = &state->mergeTree;
state->state = SFSLASTNEXTROW_BLOCKROW;
}
case SFSLASTNEXTROW_BLOCKROW: {
if (nCols != state->pLoadInfo->numOfCols) {
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
state->pLoadInfo[i].numOfCols = nCols;
state->pLoadInfo[i].checkRemainingRow = state->checkRemainingRow;
}
}
bool hasVal = tMergeTreeNext(&state->mergeTree); bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) { if (!hasVal) {
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) { if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
...@@ -649,19 +662,9 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa ...@@ -649,19 +662,9 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
goto _next_fileset; goto _next_fileset;
} }
state->state = SFSLASTNEXTROW_BLOCKROW;
checkRemainingRow = false;
}
case SFSLASTNEXTROW_BLOCKROW: {
bool skipRow = false;
do {
bool hasVal = false;
state->row = tMergeTreeGetRow(&state->mergeTree); state->row = tMergeTreeGetRow(&state->mergeTree);
*ppRow = &state->row; *ppRow = &state->row;
if (nCols != state->pLoadInfo->numOfCols) {
state->pLoadInfo->numOfCols = nCols;
}
hasVal = tMergeTreeNext(&state->mergeTree);
if (TSDBROW_TS(&state->row) <= state->lastTs) { if (TSDBROW_TS(&state->row) <= state->lastTs) {
*pIgnoreEarlierTs = true; *pIgnoreEarlierTs = true;
*ppRow = NULL; *ppRow = NULL;
...@@ -671,54 +674,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa ...@@ -671,54 +674,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
*pIgnoreEarlierTs = false; *pIgnoreEarlierTs = false;
if (!hasVal) { if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
break;
} }
if (checkRemainingRow) { if (!state->checkRemainingRow) {
bool skipBlock = true; state->checkRemainingRow = true;
SBlockData *pBlockData = state->row.pBlockData;
for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
SColData *pColData = &pBlockData->aColData[colIndex];
int16_t cid = pColData->cid;
if (cid == aCols[inputColIndex]) {
if (isLast && (pColData->flag & HAS_VALUE)) {
skipBlock = false;
break;
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
skipBlock = false;
break;
}
}
}
}
/*
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
SColData *pColData = &pBlockData->aColData[colIndex];
int16_t cid = pColData->cid;
if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
if (isLast && (pColData->flag & HAS_VALUE)) {
skipBlock = false;
break;
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
skipBlock = false;
break;
}
++inputColIndex;
} }
}
*/
if (skipBlock) {
skipRow = true;
}
}
} while (skipRow);
return code; return code;
} }
default: default:
......
...@@ -504,9 +504,34 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { ...@@ -504,9 +504,34 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->iRow += step; pIter->iRow += step;
while (1) { while (1) {
bool skipBlock = false;
findNextValidRow(pIter, idStr); findNextValidRow(pIter, idStr);
if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) { if (pIter->pBlockLoadInfo->checkRemainingRow) {
skipBlock = true;
int16_t *aCols = pIter->pBlockLoadInfo->colIds;
int nCols = pIter->pBlockLoadInfo->numOfCols;
bool isLast = pIter->pBlockLoadInfo->isLast;
for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
SColData *pColData = &pBlockData->aColData[colIndex];
int16_t cid = pColData->cid;
if (cid == aCols[inputColIndex]) {
if (isLast && (pColData->flag & HAS_VALUE)) {
skipBlock = false;
break;
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
skipBlock = false;
break;
}
}
}
}
}
if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
tLDataIterNextBlock(pIter, idStr); tLDataIterNextBlock(pIter, idStr);
if (pIter->pSttBlk == NULL) { // no more data if (pIter->pSttBlk == NULL) { // no more data
goto _exit; goto _exit;
......
...@@ -947,6 +947,12 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) { ...@@ -947,6 +947,12 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
return 0; return 0;
} }
static int32_t txnIdCompareDesc(const void *pLeft, const void *pRight) {
int64_t lhs = *(int64_t *)pLeft;
int64_t rhs = *(int64_t *)pRight;
return lhs > rhs ? -1 : 1;
}
int tdbPagerRestoreJournals(SPager *pPager) { int tdbPagerRestoreJournals(SPager *pPager) {
tdbDirEntryPtr pDirEntry; tdbDirEntryPtr pDirEntry;
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName); tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
...@@ -955,23 +961,33 @@ int tdbPagerRestoreJournals(SPager *pPager) { ...@@ -955,23 +961,33 @@ int tdbPagerRestoreJournals(SPager *pPager) {
return -1; return -1;
} }
SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));
while ((pDirEntry = tdbReadDir(pDir)) != NULL) { while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry)); char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) { if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
int64_t txnId = -1;
sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
taosArrayPush(pTxnList, &txnId);
}
}
taosArraySort(pTxnList, txnIdCompareDesc);
for (int i = 0; i < TARRAY_SIZE(pTxnList); ++i) {
int64_t *pTxnId = taosArrayGet(pTxnList, i);
char jname[TD_PATH_MAX] = {0}; char jname[TD_PATH_MAX] = {0};
int dirLen = strlen(pPager->pEnv->dbName); int dirLen = strlen(pPager->pEnv->dbName);
memcpy(jname, pPager->pEnv->dbName, dirLen); memcpy(jname, pPager->pEnv->dbName, dirLen);
jname[dirLen] = '/'; jname[dirLen] = '/';
memcpy(jname + dirLen + 1, name, strlen(name)); sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
if (tdbPagerRestore(pPager, jname) < 0) { if (tdbPagerRestore(pPager, jname) < 0) {
tdbCloseDir(&pDir); tdbCloseDir(&pDir);
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name); tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);
return -1; return -1;
} }
} }
}
taosArrayDestroy(pTxnList);
tdbCloseDir(&pDir); tdbCloseDir(&pDir);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册