diff --git a/deps/MsvcLibX/include/msvcUnistd.h b/deps/MsvcLibX/include/msvcUnistd.h index 9ad60625e07be8e9b749d1951375a3a501832366..9b59bae7f026d1daff65641f665f1df4ca6eb3b5 100644 --- a/deps/MsvcLibX/include/msvcUnistd.h +++ b/deps/MsvcLibX/include/msvcUnistd.h @@ -89,11 +89,12 @@ pid_t getppid(void); /* Get parent PID */ /* Path management */ #if defined(_WIN32) -#if defined(_UTF8_SOURCE) || defined(_BSD_SOURCE) || defined(_GNU_SOURCE) #define realpath realpathU +#if defined(_UTF8_SOURCE) || defined(_BSD_SOURCE) || defined(_GNU_SOURCE) +// #define realpath realpathU #define CompactPath CompactPathU #else /* _ANSI_SOURCE */ -#define realpath realpathA +// #define realpath realpathA #define CompactPath CompactPathA #endif #endif /* defined(_WIN32) */ diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 980524be965c94bd48185494d864d5dd5701f20e..3d88cf83127672f8f5780da695e24cee4850c035 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -59,7 +59,6 @@ char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string int8_t tsEnableCoreFile = 0; int32_t tsMaxBinaryDisplayWidth = 30; -char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/"; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -182,6 +181,7 @@ char tsDnodeDir[TSDB_FILENAME_LEN] = {0}; char tsMnodeDir[TSDB_FILENAME_LEN] = {0}; char tsDataDir[TSDB_FILENAME_LEN] = {0}; char tsScriptDir[TSDB_FILENAME_LEN] = {0}; +char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/"; int32_t tsDiskCfgNum = 0; diff --git a/src/os/inc/osDef.h b/src/os/inc/osDef.h index 04cb8b6e74b870d73c8f27707bd8a92f7b309e98..cb91b0526bdec17af1d5d86bfb24f8f95b56e01c 100644 --- a/src/os/inc/osDef.h +++ b/src/os/inc/osDef.h @@ -26,10 +26,6 @@ extern "C" { #endif #endif -#ifndef STDERR_FILENO -#define STDERR_FILENO (2) -#endif - #define FD_VALID(x) ((x) > STDERR_FILENO) #define FD_INITIALIZER ((int32_t)-1) diff --git a/src/os/inc/osWindows.h b/src/os/inc/osWindows.h index 6f96e4d1c80f5f376684f758140be16cc9498b65..d54d519cc3c893885a1c92b04501ee4b558290af 100644 --- a/src/os/inc/osWindows.h +++ b/src/os/inc/osWindows.h @@ -46,6 +46,8 @@ #include "msvcFcntl.h" #include "msvcLibgen.h" #include "msvcStdio.h" +#include "msvcUnistd.h" +#include "msvcLibgen.h" #include "sys/msvcStat.h" #include "sys/msvcTypes.h" @@ -144,7 +146,6 @@ typedef int (*__compar_fn_t)(const void *, const void *); #define in_addr_t unsigned long #define socklen_t int #define htobe64 htonll -#define getpid _getpid struct tm *localtime_r(const time_t *timep, struct tm *result); char * strptime(const char *buf, const char *fmt, struct tm *tm); @@ -153,15 +154,8 @@ char * getpass(const char *prefix); int flock(int fd, int option); int fsync(int filedes); char * strndup(const char *s, size_t n); -char * dirname(char *pszPathname); int gettimeofday(struct timeval *ptv, void *pTimeZone); -// for access function in io.h -#define F_OK 00 //Existence only -#define W_OK 02 //Write - only -#define R_OK 04 //Read - only -#define X_OK 06 //Read and write - // for send function in tsocket.c #define MSG_NOSIGNAL 0 #define SO_NO_CHECK 0x1234 @@ -208,8 +202,6 @@ typedef struct { int wordexp(char *words, wordexp_t *pwordexp, int flags); void wordfree(wordexp_t *pwordexp); -char *realpath(char *path, char *resolved_path); - #define openlog(a, b, c) #define closelog() #define LOG_ERR 0 diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 42f2ff3afe3c1efe7603af32985c6c52ddc84cd4..0b7b5ca487e5728a5dbdd93e45ee2b0593f98000 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -142,6 +142,8 @@ int64_t taosCopy(char *from, char *to) { if (bytes < sizeof(buffer)) break; } + fsync(fidto); + close(fidfrom); close(fidto); return size; diff --git a/src/os/src/windows/wString.c b/src/os/src/windows/wString.c index 1fb235a00575529be660cea18c1401f1d075a49b..67237e655cd2916ec2ec80daa01e7a0f9ae0d3e3 100644 --- a/src/os/src/windows/wString.c +++ b/src/os/src/windows/wString.c @@ -75,18 +75,6 @@ char *getpass(const char *prefix) { return passwd; } -char *strndup(const char *s, size_t n) { - size_t len = strlen(s); - if (len >= n) { - len = n; - } - - char *r = calloc(len + 1, 1); - memcpy(r, s, len); - r[len] = 0; - return r; -} - int twcslen(const wchar_t *wcs) { int *wstr = (int *)wcs; if (NULL == wstr) { diff --git a/src/os/src/windows/wWordexp.c b/src/os/src/windows/wWordexp.c index 929505516dede880e0354dbed63919198ae9e5d1..febe22ac8fa8566607fe2bf9b0be9cc7443c8012 100644 --- a/src/os/src/windows/wWordexp.c +++ b/src/os/src/windows/wWordexp.c @@ -38,7 +38,3 @@ int wordexp(char *words, wordexp_t *pwordexp, int flags) { } void wordfree(wordexp_t *pwordexp) {} - -char *realpath(char *path, char *resolved_path) { - return _fullpath(path, resolved_path, TSDB_FILENAME_LEN - 1); -} \ No newline at end of file diff --git a/src/tfs/src/tfs.c b/src/tfs/src/tfs.c index d942151843ae4573b6cfa3f65bdc315919364923..7b7c9b6127db0f1c8cabab35254ba4a128e05f05 100644 --- a/src/tfs/src/tfs.c +++ b/src/tfs/src/tfs.c @@ -593,7 +593,7 @@ void taosGetDisk() { tsAvailLogDirGB = (float)(diskSize.avail / unit); } - if (taosGetDiskSize("/tmp", &diskSize) == 0) { + if (taosGetDiskSize(tsTempDir, &diskSize) == 0) { tsTotalTmpDirGB = (float)(diskSize.tsize / unit); tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit); } diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 132e90f8d1fdaad8812c34d872cca19f79d417df..f1e2422e45b195e3874518132051ded67a776449 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -20,6 +20,8 @@ #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF #define TSDB_IVLD_FID INT_MIN +#define TSDB_FILE_STATE_OK 0 +#define TSDB_FILE_STATE_BAD 1 #define TSDB_FILE_INFO(tf) (&((tf)->info)) #define TSDB_FILE_F(tf) (&((tf)->f)) @@ -31,6 +33,10 @@ #define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf)) #define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf)) #define TSDB_FILE_FSYNC(tf) fsync(TSDB_FILE_FD(tf)) +#define TSDB_FILE_STATE(tf) ((tf)->state) +#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) +#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) +#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; @@ -47,10 +53,11 @@ typedef struct { SMFInfo info; TFILE f; int fd; + uint8_t state; } SMFile; void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver); -void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile); +void tsdbInitMFileEx(SMFile* pMFile, const SMFile* pOMFile); int tsdbEncodeSMFile(void** buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); int tsdbEncodeSMFileEx(void** buf, SMFile* pMFile); @@ -165,6 +172,7 @@ typedef struct { SDFInfo info; TFILE f; int fd; + uint8_t state; } SDFile; void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype); @@ -346,4 +354,14 @@ static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, *maxKey = *minKey + days * tsMsPerDay[precision] - 1; } +static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { + return false; + } + } + + return true; +} + #endif /* _TS_TSDB_FILE_H_ */ \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 3216b18459bb8a7028c54ff5cf7f17be81c2e154..a777b11186f064436137d267d73f9a70b5aa8701 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -52,7 +52,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo); static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen); static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); static int tsdbCommitTSData(STsdbRepo *pRepo); -static int tsdbStartCommit(STsdbRepo *pRepo); +static void tsdbStartCommit(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo, int eno); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCreateCommitIters(SCommitH *pCommith); @@ -84,10 +84,7 @@ static int tsdbApplyRtn(STsdbRepo *pRepo); static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); void *tsdbCommitData(STsdbRepo *pRepo) { - if (tsdbStartCommit(pRepo) < 0) { - tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } + tsdbStartCommit(pRepo); // Commit to update meta file if (tsdbCommitMeta(pRepo) < 0) { @@ -138,11 +135,15 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); if (tsdbCreateMFile(&mf, true) < 0) { + tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } + + tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); } else { tsdbInitMFileEx(&mf, pOMFile); if (tsdbOpenMFile(&mf, O_WRONLY) < 0) { + tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } } @@ -154,12 +155,20 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { if (pAct->act == TSDB_UPDATE_META) { pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { + tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); tsdbCloseMFile(&mf); + tsdbApplyMFileChange(&mf, pOMFile); + // TODO: need to reload metaCache return -1; } } else if (pAct->act == TSDB_DROP_META) { if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) { + tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); tsdbCloseMFile(&mf); + tsdbApplyMFileChange(&mf, pOMFile); + // TODO: need to reload metaCache return -1; } } else { @@ -168,6 +177,9 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { } if (tsdbUpdateMFileHeader(&mf) < 0) { + tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); + tsdbApplyMFileChange(&mf, pOMFile); + // TODO: need to reload metaCache return -1; } @@ -208,6 +220,8 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision)); pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision)); + tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, + pRtn->minFid, pRtn->midFid, pRtn->maxFid); } static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) { @@ -238,7 +252,7 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid)); if (pRecord != NULL) { - pMFile->info.tombSize += pRecord->size; + pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); } else { pMFile->info.nRecords++; } @@ -253,7 +267,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); if (pRecord == NULL) { - tsdbError("failed to drop KV store record with key %" PRIu64 " since not find", uid); + tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid); return -1; } @@ -264,11 +278,11 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { void *pBuf = buf; tsdbEncodeKVRecord(&pBuf, &rInfo); - if (tsdbAppendMFile(pMFile, buf, POINTER_DISTANCE(pBuf, buf), NULL) < 0) { + if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) { return -1; } - pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf)); + pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord)); pMFile->info.nDels++; pMFile->info.nRecords--; pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); @@ -302,7 +316,12 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { // Skip expired memory data and expired FSET tsdbSeekCommitIter(&commith, commith.rtn.minKey); while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) { - if (pSet->fid >= commith.rtn.minFid) break; + if (pSet->fid < commith.rtn.minFid) { + tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); + } else { + break; + } } // Loop to commit to each file @@ -349,7 +368,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { return 0; } -static int tsdbStartCommit(STsdbRepo *pRepo) { +static void tsdbStartCommit(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; ASSERT(pMem->numOfRows > 0 || listNEles(pMem->actList) > 0); @@ -360,7 +379,6 @@ static int tsdbStartCommit(STsdbRepo *pRepo) { tsdbStartFSTxn(pRepo, pMem->pointsAdd, pMem->storageAdd); pRepo->code = TSDB_CODE_SUCCESS; - return 0; } static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { @@ -413,14 +431,18 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { if (pIter->pTable == NULL) continue; if (tsdbCommitToTable(pCommith, tid) < 0) { - // TODO: revert the file change tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1; } } if (tsdbWriteBlockIdx(pCommith) < 0) { + tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1; } @@ -674,7 +696,11 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { TSDB_RUNLOCK_TABLE(pIter->pTable); - if (tsdbWriteBlockInfo(pCommith) < 0) return -1; + if (tsdbWriteBlockInfo(pCommith) < 0) { + tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + return -1; + } return 0; } @@ -926,6 +952,8 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) { if (nidx <= 0) { // All data are deleted + pHeadf->info.offset = 0; + pHeadf->info.len = 0; return 0; } @@ -1227,7 +1255,6 @@ static void tsdbResetCommitFile(SCommitH *pCommith) { } static void tsdbResetCommitTable(SCommitH *pCommith) { - tdResetDataCols(pCommith->pDataCols); taosArrayClear(pCommith->aSubBlk); taosArrayClear(pCommith->aSupBlk); pCommith->pTable = NULL; @@ -1256,6 +1283,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } + + tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet), + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); } else { pCommith->isRFileSet = false; } @@ -1266,6 +1296,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo))); if (tsdbCreateDFileSet(pWSet, true) < 0) { + tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), + TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno)); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); } @@ -1274,6 +1306,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid pCommith->isDFileSame = false; pCommith->isLFileSame = false; + + tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), + TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); } else { did.level = TSDB_FSET_LEVEL(pSet); did.id = TSDB_FSET_ID(pSet); @@ -1285,6 +1320,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); if (tsdbCreateDFile(pWHeadf, true) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), + tstrerror(terrno)); + if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; @@ -1296,7 +1334,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); tsdbInitDFileEx(pWDataf, pRDataf); if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { - tsdbCloseDFile(pWHeadf); + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); @@ -1313,6 +1354,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid pCommith->isLFileSame = true; if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), + tstrerror(terrno)); + tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { @@ -1325,6 +1369,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid pCommith->isLFileSame = false; if (tsdbCreateDFile(pWLastf, true) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), + tstrerror(terrno)); + tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { @@ -1360,7 +1407,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p if (pBlock->last) { if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; } else { - if (mergeRows < pCfg->maxRowsPerFileBlock) return true; + if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; } } @@ -1373,12 +1420,16 @@ static int tsdbApplyRtn(STsdbRepo *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); SDFileSet *pSet; - // Get retentioni snapshot + // Get retention snapshot tsdbGetRtnSnap(pRepo, &rtn); tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); while ((pSet = tsdbFSIterNext(&fsiter))) { - if (pSet->fid < rtn.minFid) continue; + if (pSet->fid < rtn.minFid) { + tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); + continue; + } if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { return -1; @@ -1392,10 +1443,13 @@ static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; SDFileSet nSet; STsdbFS * pfs = REPO_FS(pRepo); + int level; ASSERT(pSet->fid >= pRtn->minFid); - tfsAllocDisk(tsdbGetFidLevel(pSet->fid, pRtn), &(did.level), &(did.id)); + level = tsdbGetFidLevel(pSet->fid, pRtn); + + tfsAllocDisk(level, &(did.level), &(did.id)); if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; return -1; @@ -1406,12 +1460,17 @@ static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs)); if (tsdbCopyDFileSet(pSet, &nSet) < 0) { + tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); return -1; } if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { return -1; } + + tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); } else { // On a correct level if (tsdbUpdateDFileSet(pfs, pSet) < 0) { diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index b3b26bcf1fbbd8fdf5b4ef6894bab2a6c5947ac7..e6815083371693a5552372bcc792e14213ae3456 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -701,6 +701,8 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { int64_t maxBufSize = 0; SMFInfo minfo; + taosHashEmpty(pfs->metaCache); + // No meta file, just return if (pfs->cstatus->pmf == NULL) return 0; @@ -718,6 +720,12 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { while (true) { int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord)); if (tsize == 0) break; + + if (tsize < 0) { + tsdbError("vgId:%d failed to read META file since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + if (tsize < sizeof(SKVRecord)) { tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord), TSDB_FILE_FULL_NAME(pMFile)); @@ -840,7 +848,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo) { continue; } - if (tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) { + if (pfs->cstatus->pmf && tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) { continue; } diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 9a53bf45779ed3d609e5f5b1e6357ad0d72ad2bb..8124a0e3b5984eecee1f0934a385be56d5bd3b61 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -33,7 +33,7 @@ static int tsdbRollBackDFile(SDFile *pDFile); void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { char fname[TSDB_FILENAME_LEN]; - TSDB_FILE_SET_CLOSED(pMFile); + TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_OK); memset(&(pMFile->info), 0, sizeof(pMFile->info)); pMFile->info.magic = TSDB_FILE_INIT_MAGIC; @@ -42,7 +42,7 @@ void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { tfsInitFile(TSDB_FILE_F(pMFile), did.level, did.id, fname); } -void tsdbInitMFileEx(SMFile *pMFile, SMFile *pOMFile) { +void tsdbInitMFileEx(SMFile *pMFile, const SMFile *pOMFile) { *pMFile = *pOMFile; TSDB_FILE_SET_CLOSED(pMFile); } @@ -201,6 +201,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile)); pRepo->state |= TSDB_STATE_BAD_META; + TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD); return 0; } @@ -232,6 +233,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { tsdbError("vgId:%d meta file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), mfstat.st_size, pMFile->info.size); pRepo->state |= TSDB_STATE_BAD_META; + TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return 0; } else { @@ -293,6 +295,8 @@ static int tsdbRollBackMFile(SMFile *pMFile) { void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) { char fname[TSDB_FILENAME_LEN]; + TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK); + TSDB_FILE_SET_CLOSED(pDFile); memset(&(pDFile->info), 0, sizeof(pDFile->info)); @@ -439,6 +443,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile)); pRepo->state |= TSDB_STATE_BAD_DATA; + TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); return 0; } @@ -470,6 +475,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size); pRepo->state |= TSDB_STATE_BAD_DATA; + TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return 0; } else { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 0931b6281be286aa6d0ac35b942beac8fca98cb3..73a127079920bcdf0cdc2d3e244998310c4d8dbc 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -216,11 +216,13 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { } int tsdbAsyncCommit(STsdbRepo *pRepo) { - if (pRepo->mem == NULL) return 0; - tsem_wait(&(pRepo->readyToCommit)); ASSERT(pRepo->imem == NULL); + if (pRepo->mem == NULL) { + tsem_post(&(pRepo->readyToCommit)); + return 0; + } if (pRepo->code != TSDB_CODE_SUCCESS) { tsdbWarn("vgId:%d try to commit when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno)); diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index bae4637d77775b03d64a7111a4570ef86a686f7c..88ab973f5ed85589996b2a90018036a51df48897 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -85,6 +85,7 @@ int32_t tsdbSyncRecv(void *tsdb, SOCKET socketFd) { pRepo->state = TSDB_STATE_OK; tsdbInitSyncH(&synch, pRepo, socketFd); + tsem_wait(&(pRepo->readyToCommit)); tsdbStartFSTxn(pRepo, 0, 0); if (tsdbSyncRecvMeta(&synch) < 0) { @@ -98,6 +99,7 @@ int32_t tsdbSyncRecv(void *tsdb, SOCKET socketFd) { } tsdbEndFSTxn(pRepo); + tsem_post(&(pRepo->readyToCommit)); tsdbDestroySyncH(&synch); // Reload file change @@ -107,6 +109,7 @@ int32_t tsdbSyncRecv(void *tsdb, SOCKET socketFd) { _err: tsdbEndFSTxnWithError(REPO_FS(pRepo)); + tsem_post(&(pRepo->readyToCommit)); tsdbDestroySyncH(&synch); return -1; } @@ -191,7 +194,8 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) { return 0; } - if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) { + if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0 || + TSDB_FILE_IS_BAD(pLMFile)) { // Local has no meta file or has a different meta file, need to copy from remote pSynch->mfChanged = true; @@ -409,7 +413,8 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { pSynch->pdf != NULL ? pSynch->pdf->fid : -1); pLSet = tsdbFSIterNext(&fsiter); } else { - if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) { + if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf) && + tsdbFSetIsOk(pLSet)) { // Just keep local files and notify remote not to send tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid); diff --git a/src/util/src/tconfig.c b/src/util/src/tconfig.c index eb96f81b3349bea1a67473843de2432b6b474d66..7a92750f8f2e89a93557d367b0c4772a2faaa57d 100644 --- a/src/util/src/tconfig.c +++ b/src/util/src/tconfig.c @@ -134,6 +134,11 @@ static bool taosReadDirectoryConfig(SGlobalCfg *cfg, char *input_value) { wordfree(&full_path); + char tmp[1025] = {0}; + if (realpath(option, tmp) != NULL) { + strcpy(option, tmp); + } + int code = taosMkDir(option, 0755); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c index 9b7916c8bd3b4c5878b0edc3431802b3ed7cd4f6..03f2b11eec239ad469faec66ddcbd60282e0409b 100644 --- a/src/vnode/src/vnodeCfg.c +++ b/src/vnode/src/vnodeCfg.c @@ -34,6 +34,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock; pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision; pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression; + pVnode->tsdbCfg.update = vnodeMsg->cfg.update; pVnode->tsdbCfg.cacheLastRow = vnodeMsg->cfg.cacheLastRow; pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel; pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod; @@ -227,6 +228,15 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { } vnodeMsg.cfg.quorum = (int8_t)quorum->valueint; + cJSON *update = cJSON_GetObjectItem(root, "update"); + if (!update || update->type != cJSON_Number) { + vError("vgId: %d, failed to read %s, update not found", pVnode->vgId, file); + vnodeMsg.cfg.update = 0; + vnodeMsg.cfg.vgCfgVersion = 0; + } else { + vnodeMsg.cfg.update = (int8_t)update->valueint; + } + cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file); @@ -325,6 +335,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) { len += snprintf(content + len, maxLen - len, " \"dbReplica\": %d,\n", pMsg->cfg.dbReplica); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum); + len += snprintf(content + len, maxLen - len, " \"update\": %d,\n", pMsg->cfg.update); len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); for (int32_t i = 0; i < pMsg->cfg.vgReplica; i++) { diff --git a/tests/pytest/insert/retentionpolicy.py b/tests/pytest/insert/retentionpolicy.py index c69060b5aec16b8d58aa2742e24c44d1ebe543ab..e0446113d6d1fb197490a09ebd1ebe4b5b12e66f 100644 --- a/tests/pytest/insert/retentionpolicy.py +++ b/tests/pytest/insert/retentionpolicy.py @@ -44,7 +44,8 @@ class TDTestRetetion: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows) os.system("sudo timedatectl set-ntp true") - time.sleep(40) + os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=1))) + time.sleep(5) tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) def run(self): @@ -63,7 +64,7 @@ class TDTestRetetion: tdLog.info("=============== step2") tdDnodes.stop(1) os.system("sudo timedatectl set-ntp false") - os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") + os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=48))) tdDnodes.start(1) cmd = 'insert into test values(now,5);' tdDnodes.stop(1) @@ -79,7 +80,7 @@ class TDTestRetetion: self.checkRows(5,cmd) tdLog.info("=============== step3") tdDnodes.stop(1) - os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") + os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=48))) tdDnodes.start(1) tdLog.info(cmd) tdSql.execute(cmd) @@ -99,18 +100,19 @@ class TDTestRetetion: tdLog.info(cmd) tdSql.execute(cmd) self.queryRows=tdSql.query('select * from test') - self.checkRows(7,cmd) + self.checkRows(5,cmd) tdLog.info("=============== step5") tdDnodes.stop(1) tdDnodes.start(1) cmd='select * from test where ts > now-1d' self.queryRows=tdSql.query('select * from test where ts > now-1d') - self.checkRows(1,cmd) + self.checkRows(2,cmd) def stop(self): os.system("sudo timedatectl set-ntp true") - time.sleep(40) + os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=1))) + time.sleep(5) tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/pytest_2.sh b/tests/pytest/pytest_2.sh index dde9f78953766d41bb05fa5639b3ca4e18f4ef2a..4ec517a0bf1c5eff8ad670cf28ab63d5ce818460 100755 --- a/tests/pytest/pytest_2.sh +++ b/tests/pytest/pytest_2.sh @@ -1,18 +1,18 @@ # update -#python3 ./test.py -f update/allow_update.py +python3 ./test.py -f update/allow_update.py python3 ./test.py -f update/allow_update-0.py python3 ./test.py -f update/append_commit_data.py python3 ./test.py -f update/append_commit_last-0.py python3 ./test.py -f update/append_commit_last.py -#python3 ./test.py -f update/merge_commit_data.py -#python3 ./test.py -f update/merge_commit_data-0.py -#python3 ./test.py -f update/merge_commit_data2.py -#python3 ./test.py -f update/merge_commit_data2_update0.py -#python3 ./test.py -f update/merge_commit_last-0.py -#python3 ./test.py -f update/merge_commit_last.py -#python3 ./test.py -f update/bug_td2279.py +python3 ./test.py -f update/merge_commit_data.py +python3 ./test.py -f update/merge_commit_data-0.py +python3 ./test.py -f update/merge_commit_data2.py +python3 ./test.py -f update/merge_commit_data2_update0.py +python3 ./test.py -f update/merge_commit_last-0.py +python3 ./test.py -f update/merge_commit_last.py +python3 ./test.py -f update/bug_td2279.py # wal python3 ./test.py -f wal/addOldWalTest.py diff --git a/tests/script/wtest.bat b/tests/script/wtest.bat index 0b5cdda52787971c115a96c3674689f4a42504fc..9ed58a90d1ba6bd781d306070e03f56b75984093 100644 --- a/tests/script/wtest.bat +++ b/tests/script/wtest.bat @@ -44,10 +44,10 @@ echo serverPort 7100 >> %TAOS_CFG% echo logDir %LOG_DIR% >> %TAOS_CFG% echo scriptDir %SCRIPT_DIR% >> %TAOS_CFG% echo numOfLogLines 100000000 >> %TAOS_CFG% -echo rpcDebugFlag 143 >> %TAOS_CFG% +echo rpcDebugFlag 135 >> %TAOS_CFG% echo tmrDebugFlag 131 >> %TAOS_CFG% -echo cDebugFlag 143 >> %TAOS_CFG% -echo udebugFlag 143 >> %TAOS_CFG% +echo cDebugFlag 135 >> %TAOS_CFG% +echo udebugFlag 135 >> %TAOS_CFG% echo wal 0 >> %TAOS_CFG% echo asyncLog 0 >> %TAOS_CFG% echo locale en_US.UTF-8 >> %TAOS_CFG%