未验证 提交 39f38085 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9734 from taosdata/feature/tq

add wal handle meta corrupt
...@@ -71,6 +71,7 @@ extern int32_t wDebugFlag; ...@@ -71,6 +71,7 @@ extern int32_t wDebugFlag;
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1 #define WAL_CUR_FAILED 1
...@@ -98,6 +99,7 @@ typedef struct { ...@@ -98,6 +99,7 @@ typedef struct {
} SWalCfg; } SWalCfg;
typedef struct { typedef struct {
uint64_t magic;
uint32_t cksumHead; uint32_t cksumHead;
uint32_t cksumBody; uint32_t cksumBody;
SWalReadHead head; SWalReadHead head;
......
...@@ -17,11 +17,11 @@ ...@@ -17,11 +17,11 @@
#define _TD_WAL_INT_H_ #define _TD_WAL_INT_H_
#include "compare.h" #include "compare.h"
#include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcoding.h"
#include "wal.h" #include "wal.h"
#include "taoserror.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -40,6 +40,19 @@ typedef struct WalIdxEntry { ...@@ -40,6 +40,19 @@ typedef struct WalIdxEntry {
int64_t offset; int64_t offset;
} SWalIdxEntry; } SWalIdxEntry;
static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
int tlen;
tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
return 0;
}
static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
buf = taosDecodeFixedI64(buf, &pIdxEntry->ver);
buf = taosDecodeFixedI64(buf, &pIdxEntry->offset);
return buf;
}
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft; SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft;
SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight; SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight;
...@@ -130,12 +143,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); ...@@ -130,12 +143,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// meta section end // meta section end
// seek section // seek section
int walChangeFile(SWal* pWal, int64_t ver); int walChangeWrite(SWal* pWal, int64_t ver);
int walChangeFileToLast(SWal* pWal); int walSetWrite(SWal* pWal);
// seek section end // seek section end
int64_t walGetSeq(); int64_t walGetSeq();
int walSeekVer(SWal* pWal, int64_t ver); int walSeekWriteVer(SWal* pWal, int64_t ver);
int walRoll(SWal* pWal); int walRoll(SWal* pWal);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#include "cJSON.h" #include "cJSON.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tfile.h"
#include "tref.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
...@@ -34,13 +33,74 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { ...@@ -34,13 +33,74 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
} }
static inline int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL);
int sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0);
for (int i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
}
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
struct stat statbuf;
stat(fnameStr, &statbuf);
int readSize = MIN(WAL_MAX_SIZE, statbuf.st_size);
FileFd fd = taosOpenFileRead(fnameStr);
if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
uint64_t magic = WAL_MAGIC;
char* buf = malloc(readSize + 5);
if (buf == NULL) {
taosCloseFile(fd);
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
if (readSize != taosReadFile(fd, buf, readSize)) {
free(buf);
taosCloseFile(fd);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
char* found = strstr(buf, (const char*)&magic);
if (found == NULL) {
ASSERT(false);
// file has to be deleted
free(buf);
taosCloseFile(fd);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
char *another;
while((another = strstr(found + 1, (const char*)&magic)) != NULL) {
// read and validate
SWalHead *logContent = (SWalHead*)another;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = another;
}
}
taosCloseFile(fd);
SWalHead *lastEntry = (SWalHead*)found;
return lastEntry->head.version;
}
int walCheckAndRepairMeta(SWal* pWal) { int walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info // load log files, get first/snapshot/last version info
const char* logPattern = "^[0-9]+.log$"; const char* logPattern = "^[0-9]+.log$";
const char* idxPattern = "^[0-9]+.idx$"; const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern; regex_t logRegPattern;
regex_t idxRegPattern; regex_t idxRegPattern;
SArray* pLogArray = taosArrayInit(8, sizeof(int64_t)); SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo));
regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
...@@ -51,19 +111,84 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -51,19 +111,84 @@ int walCheckAndRepairMeta(SWal* pWal) {
return -1; return -1;
} }
// scan log files and build new meta
struct dirent* ent; struct dirent* ent;
while ((ent = readdir(dir)) != NULL) { while ((ent = readdir(dir)) != NULL) {
char* name = basename(ent->d_name); char* name = basename(ent->d_name);
int code = regexec(&logRegPattern, name, 0, NULL, 0); int code = regexec(&logRegPattern, name, 0, NULL, 0);
if (code == 0) { if (code == 0) {
int64_t firstVer; SWalFileInfo fileInfo;
sscanf(name, "%" PRId64 ".log", &firstVer); memset(&fileInfo, -1, sizeof(SWalFileInfo));
taosArrayPush(pLogArray, &firstVer); sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
FileFd fd = taosOpenFileRead(ent->d_name);
//get lastVer
//get size
taosArrayPush(pLogInfoArray, &fileInfo);
} }
} }
// load meta regfree(&logRegPattern);
// if not match, or meta missing regfree(&idxRegPattern);
taosArraySort(pLogInfoArray, compareWalFileInfo);
int oldSz = 0;
if (pWal->fileInfoSet) {
oldSz = taosArrayGetSize(pWal->fileInfoSet);
}
int newSz = taosArrayGetSize(pLogInfoArray);
// case 1. meta file not exist / cannot be parsed
if (pWal->fileInfoSet == NULL && newSz != 0) {
// recover fileInfo set
pWal->fileInfoSet = pLogInfoArray;
if (newSz != 0) {
// recover meta version
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
pWal->writeCur = newSz - 1;
}
// recover file size
} else if (oldSz < newSz) {
for (int i = oldSz; i < newSz; i++) {
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo);
}
pWal->writeCur = newSz - 1;
}
if (pWal->fileInfoSet && taosArrayGetSize(pWal->fileInfoSet) != 0) {
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
ASSERT(pWal->vers.lastVer != -1);
}
// case 2. versions in meta not match log
// or some log not included in meta
// (e.g. program killed)
//
// case 3. other corrupt cases
//
#if 0
int sz = taosArrayGetSize(pLogInfoArray);
for (int i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
if (i == 0 && pFileInfo->firstVer != walGetFirstVer(pWal)) {
//repair
}
if (i > 0) {
SWalFileInfo* pLastFileInfo = taosArrayGet(pLogInfoArray, i-1);
if (pLastFileInfo->lastVer != pFileInfo->firstVer) {
}
}
}
#endif
int code = walSaveMeta(pWal);
if (code < 0) {
return -1;
}
// get last version of this file
//
// rebuild meta // rebuild meta
return 0; return 0;
} }
...@@ -87,6 +212,7 @@ int walRollFileInfo(SWal* pWal) { ...@@ -87,6 +212,7 @@ int walRollFileInfo(SWal* pWal) {
// TODO: change to emplace back // TODO: change to emplace back
SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo)); SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo));
if (pNewInfo == NULL) { if (pNewInfo == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
pNewInfo->firstVer = pWal->vers.lastVer + 1; pNewInfo->firstVer = pWal->vers.lastVer + 1;
...@@ -94,7 +220,7 @@ int walRollFileInfo(SWal* pWal) { ...@@ -94,7 +220,7 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo->createTs = ts; pNewInfo->createTs = ts;
pNewInfo->closeTs = -1; pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0; pNewInfo->fileSize = 0;
taosArrayPush(pWal->fileInfoSet, pNewInfo); taosArrayPush(pArray, pNewInfo);
free(pNewInfo); free(pNewInfo);
return 0; return 0;
} }
...@@ -108,7 +234,16 @@ char* walMetaSerialize(SWal* pWal) { ...@@ -108,7 +234,16 @@ char* walMetaSerialize(SWal* pWal) {
cJSON* pFiles = cJSON_CreateArray(); cJSON* pFiles = cJSON_CreateArray();
cJSON* pField; cJSON* pField;
if (pRoot == NULL || pMeta == NULL || pFiles == NULL) { if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
// TODO if(pRoot) {
cJSON_Delete(pRoot);
}
if(pMeta) {
cJSON_Delete(pMeta);
}
if(pFiles) {
cJSON_Delete(pFiles);
}
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return NULL; return NULL;
} }
cJSON_AddItemToObject(pRoot, "meta", pMeta); cJSON_AddItemToObject(pRoot, "meta", pMeta);
...@@ -221,18 +356,18 @@ int walSaveMeta(SWal* pWal) { ...@@ -221,18 +356,18 @@ int walSaveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal); int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer + 1, fnameStr); walBuildMetaName(pWal, metaVer + 1, fnameStr);
int metaTfd = tfOpenCreateWrite(fnameStr); FileFd metaFd = taosOpenFileCreateWrite(fnameStr);
if (metaTfd < 0) { if (metaFd < 0) {
return -1; return -1;
} }
char* serialized = walMetaSerialize(pWal); char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized); int len = strlen(serialized);
if (len != tfWrite(metaTfd, serialized, len)) { if (len != taosWriteFile(metaFd, serialized, len)) {
// TODO:clean file // TODO:clean file
return -1; return -1;
} }
tfClose(metaTfd); taosCloseFile(metaFd);
// delete old file // delete old file
if (metaVer > -1) { if (metaVer > -1) {
walBuildMetaName(pWal, metaVer, fnameStr); walBuildMetaName(pWal, metaVer, fnameStr);
...@@ -247,7 +382,7 @@ int walLoadMeta(SWal* pWal) { ...@@ -247,7 +382,7 @@ int walLoadMeta(SWal* pWal) {
// find existing meta file // find existing meta file
int metaVer = walFindCurMetaVer(pWal); int metaVer = walFindCurMetaVer(pWal);
if (metaVer == -1) { if (metaVer == -1) {
return 0; return -1;
} }
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr); walBuildMetaName(pWal, metaVer, fnameStr);
...@@ -257,23 +392,20 @@ int walLoadMeta(SWal* pWal) { ...@@ -257,23 +392,20 @@ int walLoadMeta(SWal* pWal) {
int size = statbuf.st_size; int size = statbuf.st_size;
char* buf = malloc(size + 5); char* buf = malloc(size + 5);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
memset(buf, 0, size + 5); memset(buf, 0, size + 5);
int tfd = tfOpenRead(fnameStr); FileFd fd = taosOpenFileRead(fnameStr);
if (tfRead(tfd, buf, size) != size) { if (taosReadFile(fd, buf, size) != size) {
tfClose(tfd); terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(fd);
free(buf); free(buf);
return -1; return -1;
} }
// load into fileInfoSet // load into fileInfoSet
int code = walMetaDeserialize(pWal, buf); int code = walMetaDeserialize(pWal, buf);
if (code != 0) { taosCloseFile(fd);
tfClose(tfd);
free(buf); free(buf);
return -1; return code;
}
tfClose(tfd);
free(buf);
return 0;
} }
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include "tref.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
static int walSeekFilePos(SWal* pWal, int64_t ver) { static int walSeekWritePos(SWal* pWal, int64_t ver) {
int code = 0; int code = 0;
int64_t idxTfd = pWal->writeIdxTfd; int64_t idxTfd = pWal->writeIdxTfd;
...@@ -41,7 +41,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { ...@@ -41,7 +41,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
return -1; return -1;
} }
ASSERT(entry.ver == ver); ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_CUR); code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -49,7 +49,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { ...@@ -49,7 +49,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
return code; return code;
} }
int walChangeFileToLast(SWal* pWal) { int walSetWrite(SWal* pWal) {
int64_t idxTfd, logTfd; int64_t idxTfd, logTfd;
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
...@@ -57,13 +57,13 @@ int walChangeFileToLast(SWal* pWal) { ...@@ -57,13 +57,13 @@ int walChangeFileToLast(SWal* pWal) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, fileFirstVer, fnameStr); walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr); idxTfd = tfOpenCreateWriteAppend(fnameStr);
if (idxTfd < 0) { if (idxTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
walBuildLogName(pWal, fileFirstVer, fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr); logTfd = tfOpenCreateWriteAppend(fnameStr);
if (logTfd < 0) { if (logTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -74,46 +74,57 @@ int walChangeFileToLast(SWal* pWal) { ...@@ -74,46 +74,57 @@ int walChangeFileToLast(SWal* pWal) {
return 0; return 0;
} }
int walChangeFile(SWal* pWal, int64_t ver) { int walChangeWrite(SWal* pWal, int64_t ver) {
int code = 0; int code = 0;
int64_t idxTfd, logTfd; int64_t idxTfd, logTfd;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (pWal->writeLogTfd != -1) {
code = tfClose(pWal->writeLogTfd); code = tfClose(pWal->writeLogTfd);
if (code != 0) { if (code != 0) {
// TODO
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
}
if (pWal->writeIdxTfd != -1) {
code = tfClose(pWal->writeIdxTfd); code = tfClose(pWal->writeIdxTfd);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
}
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
// bsearch in fileSet // bsearch in fileSet
SWalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); ASSERT(idx != -1);
int64_t fileFirstVer = pRet->firstVer; SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, idx);
// closed /*ASSERT(pFileInfo != NULL);*/
if (taosArrayGetLast(pWal->fileInfoSet) != pRet) {
walBuildIdxName(pWal, fileFirstVer, fnameStr); int64_t fileFirstVer = pFileInfo->firstVer;
idxTfd = tfOpenRead(fnameStr);
walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenRead(fnameStr);
} else {
walBuildIdxName(pWal, fileFirstVer, fnameStr); walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr); idxTfd = tfOpenCreateWriteAppend(fnameStr);
if (idxTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pWal->writeIdxTfd = -1;
return -1;
}
walBuildLogName(pWal, fileFirstVer, fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr); logTfd = tfOpenCreateWriteAppend(fnameStr);
if (logTfd < 0) {
tfClose(idxTfd);
terrno = TAOS_SYSTEM_ERROR(errno);
pWal->writeLogTfd = -1;
return -1;
} }
pWal->writeLogTfd = logTfd; pWal->writeLogTfd = logTfd;
pWal->writeIdxTfd = idxTfd; pWal->writeIdxTfd = idxTfd;
pWal->writeCur = idx;
return fileFirstVer; return fileFirstVer;
} }
int walSeekVer(SWal* pWal, int64_t ver) { int walSeekWriteVer(SWal* pWal, int64_t ver) {
int code; int code;
if (ver == pWal->vers.lastVer) { if (ver == pWal->vers.lastVer) {
return 0; return 0;
...@@ -123,14 +134,15 @@ int walSeekVer(SWal* pWal, int64_t ver) { ...@@ -123,14 +134,15 @@ int walSeekVer(SWal* pWal, int64_t ver) {
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
} }
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeFile(pWal, ver); code = walChangeWrite(pWal, ver);
if (code != 0) { if (code != 0) {
return -1; return -1;
} }
} }
code = walSeekFilePos(pWal, ver); code = walSeekWritePos(pWal, ver);
if (code != 0) { if (code != 0) {
return -1; return -1;
} }
......
...@@ -46,12 +46,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -46,12 +46,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// find correct file // find correct file
if (ver < walGetLastFileFirstVer(pWal)) { if (ver < walGetLastFileFirstVer(pWal)) {
// close current files // change current files
tfClose(pWal->writeIdxTfd); code = walChangeWrite(pWal, ver);
tfClose(pWal->writeLogTfd); if (code < 0) {
// open old files
code = walChangeFile(pWal, ver);
if (code != 0) {
return -1; return -1;
} }
...@@ -166,7 +163,8 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -166,7 +163,8 @@ int32_t walEndSnapshot(SWal *pWal) {
} }
// iterate files, until the searched result // iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) { if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize)
|| (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
// delete according to file size or close time // delete according to file size or close time
deleteCnt++; deleteCnt++;
newTotSize -= iter->fileSize; newTotSize -= iter->fileSize;
...@@ -191,13 +189,12 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -191,13 +189,12 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
} }
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
;
pWal->totSize = newTotSize; pWal->totSize = newTotSize;
pWal->vers.verInSnapshotting = -1; pWal->vers.verInSnapshotting = -1;
// save snapshot ver, commit ver // save snapshot ver, commit ver
int code = walSaveMeta(pWal); int code = walSaveMeta(pWal);
if (code != 0) { if (code < 0) {
return -1; return -1;
} }
...@@ -225,18 +222,17 @@ int walRoll(SWal *pWal) { ...@@ -225,18 +222,17 @@ int walRoll(SWal *pWal) {
walBuildIdxName(pWal, newFileFirstVersion, fnameStr); walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
idxTfd = tfOpenCreateWriteAppend(fnameStr); idxTfd = tfOpenCreateWriteAppend(fnameStr);
if (idxTfd < 0) { if (idxTfd < 0) {
ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
walBuildLogName(pWal, newFileFirstVersion, fnameStr); walBuildLogName(pWal, newFileFirstVersion, fnameStr);
logTfd = tfOpenCreateWriteAppend(fnameStr); logTfd = tfOpenCreateWriteAppend(fnameStr);
if (logTfd < 0) { if (logTfd < 0) {
ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
code = walRollFileInfo(pWal); code = walRollFileInfo(pWal);
if (code != 0) { if (code != 0) {
ASSERT(0);
return -1; return -1;
} }
...@@ -291,8 +287,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -291,8 +287,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
ASSERT(pWal->writeCur >= 0); ASSERT(pWal->writeCur >= 0);
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) { if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) {
walChangeFileToLast(pWal); walSetWrite(pWal);
tfLseek(pWal->writeLogTfd, 0, SEEK_END);
tfLseek(pWal->writeIdxTfd, 0, SEEK_END);
} }
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
......
...@@ -107,6 +107,43 @@ class WalKeepEnv : public ::testing::Test { ...@@ -107,6 +107,43 @@ class WalKeepEnv : public ::testing::Test {
const char* pathName = "/tmp/wal_test"; const char* pathName = "/tmp/wal_test";
}; };
class WalRetentionEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit();
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override {
SWalCfg cfg;
cfg.rollPeriod = -1,
cfg.segSize = -1,
cfg.retentionPeriod = -1,
cfg.retentionSize = 0,
cfg.rollPeriod = 0,
cfg.vgId = 0,
cfg.level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, &cfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = "/tmp/wal_test";
};
TEST_F(WalCleanEnv, createNew) { TEST_F(WalCleanEnv, createNew) {
walRollFileInfo(pWal); walRollFileInfo(pWal);
ASSERT(pWal->fileInfoSet != NULL); ASSERT(pWal->fileInfoSet != NULL);
...@@ -283,3 +320,61 @@ TEST_F(WalKeepEnv, readHandleRead) { ...@@ -283,3 +320,61 @@ TEST_F(WalKeepEnv, readHandleRead) {
} }
} }
} }
TEST_F(WalRetentionEnv, repairMeta1) {
walResetEnv();
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len);
ASSERT_EQ(code, 0);
}
TearDown();
//getchar();
char buf[100];
sprintf(buf, "%s/meta-ver%d", pathName, 0);
remove(buf);
SetUp();
ASSERT_EQ(pWal->vers.lastVer, 99);
SWalReadHandle* pRead = walOpenReadHandle(pWal);
ASSERT(pRead != NULL);
for (int i = 0; i < 1000; i++) {
int ver = rand() % 100;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
// for(int i = 0; i < pRead->pHead->head.len; i++) {
// printf("%d ", pRead->pHead->head.body[i]);
//}
// printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver + 1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len);
for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
for (i = 100; i < 200; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len);
ASSERT_EQ(code, 0);
}
}
...@@ -342,7 +342,7 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa ...@@ -342,7 +342,7 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) { int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) {
void* item = taosArraySearch(pArray, key, comparFn, flags); void* item = taosArraySearch(pArray, key, comparFn, flags);
return (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize; return item == NULL ? -1 : (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize;
} }
void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) { void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <random> #include <random>
#include "tarray.h" #include "tarray.h"
#include "tcompare.h"
namespace { namespace {
...@@ -48,3 +49,34 @@ static void remove_batch_test() { ...@@ -48,3 +49,34 @@ static void remove_batch_test() {
TEST(arrayTest, array_list_test) { TEST(arrayTest, array_list_test) {
remove_batch_test(); remove_batch_test();
} }
TEST(arrayTest, array_search_test) {
SArray *pa = (SArray*) taosArrayInit(4, sizeof(int32_t));
for(int32_t i = 10; i < 20; ++i) {
int32_t a = i;
taosArrayPush(pa, &a);
}
for(int i = 0; i < 30; i++) {
int32_t k = i;
int32_t* pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_GE);
int32_t idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_GE);
if(pRet == NULL) {
ASSERT_EQ(idx, -1);
} else {
ASSERT_EQ(taosArrayGet(pa, idx), pRet);
}
pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_LE);
idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_LE);
if(pRet == NULL) {
ASSERT_EQ(idx, -1);
} else {
ASSERT_EQ(taosArrayGet(pa, idx), pRet);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册