未验证 提交 d45c7bb3 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15496 from taosdata/feature/stream

enh(wal): remove file failure handling
......@@ -77,11 +77,11 @@ typedef struct {
} SWalSyncInfo;
typedef struct {
int8_t protoVer;
int64_t version;
int16_t msgType;
int64_t ingestTs;
int32_t bodyLen;
int64_t ingestTs; // not implemented
int16_t msgType;
int8_t protoVer;
// sync meta
SWalSyncInfo syncMeta;
......
......@@ -139,7 +139,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern;
regex_t idxRegPattern;
SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo));
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
......@@ -159,7 +159,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo fileInfo;
memset(&fileInfo, -1, sizeof(SWalFileInfo));
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
taosArrayPush(pLogInfoArray, &fileInfo);
taosArrayPush(actualLog, &fileInfo);
}
}
......@@ -167,10 +167,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
regfree(&logRegPattern);
regfree(&idxRegPattern);
taosArraySort(pLogInfoArray, compareWalFileInfo);
taosArraySort(actualLog, compareWalFileInfo);
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(pLogInfoArray);
int actualFileNum = taosArrayGetSize(actualLog);
#if 0
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
......@@ -196,11 +196,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) {
for (int i = metaFileNum; i < actualFileNum; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo);
}
}
taosArrayDestroy(pLogInfoArray);
taosArrayDestroy(actualLog);
pWal->writeCur = actualFileNum - 1;
if (actualFileNum > 0) {
......@@ -221,7 +221,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal);
if (code < 0) {
taosArrayDestroy(pLogInfoArray);
taosArrayDestroy(actualLog);
return -1;
}
}
......
......@@ -423,37 +423,38 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
return 0;
}
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver);
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
wDebug("vgId:%d wal start to read ver %ld", pReader->pWal->cfg.vgId, ver);
int64_t contLen;
int32_t code;
bool seeked = false;
if (pRead->pWal->vers.firstVer == -1) {
if (pReader->pWal->vers.firstVer == -1) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (pRead->curInvalid || pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
if (pReader->curInvalid || pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
return -1;
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver);
walReadSeekVerImpl(pReader, ver);
seeked = true;
continue;
} else {
......@@ -467,26 +468,26 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
}
contLen = walValidHeadCksum(pRead->pHead);
if (contLen != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
code = walValidHeadCksum(pReader->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen);
if (pReader->capacity < pReader->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.bodyLen;
pReader->pHead = ptr;
pReader->capacity = pReader->pHead->head.bodyLen;
}
if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) {
if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
pReader->pHead->head.bodyLen) {
if (contLen < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else {
......@@ -496,25 +497,28 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return -1;
}
if (pRead->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pRead->curInvalid = 1;
if (pReader->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
pReader->pHead->head.version, ver);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
contLen = walValidBodyCksum(pRead->pHead);
if (contLen != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
code = walValidBodyCksum(pReader->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1;
uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
uint32_t logCkSum = pReader->pHead->cksumBody;
wError("checksum written into log: %u, checksum calculated: %u", logCkSum, readCkSum);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
pRead->curVersion++;
pReader->curVersion++;
return 0;
}
......@@ -289,18 +289,25 @@ int32_t walEndSnapshot(SWal *pWal) {
newTotSize -= iter->fileSize;
}
}
char fnameStr[WAL_FILE_LEN];
int32_t actualDelete = 0;
char fnameStr[WAL_FILE_LEN];
// remove file
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META;
}
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0);
}
actualDelete++;
}
UPDATE_META:
// make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
taosArrayPopFrontBatch(pWal->fileInfoSet, actualDelete);
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1;
pWal->vers.firstVer = -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册