diff --git a/src/os/inc/osFile.h b/src/os/inc/osFile.h index 62e44d8eb0b70fb1526693895847637daa72247a..63f93bc0a127ee575977c230a39690d132a4f5d3 100644 --- a/src/os/inc/osFile.h +++ b/src/os/inc/osFile.h @@ -35,6 +35,7 @@ int64_t taosReadImp(int32_t fd, void *buf, int64_t count); int64_t taosWriteImp(int32_t fd, void *buf, int64_t count); int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence); int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath); +int64_t taosCopy(char *from, char *to); #define taosRead(fd, buf, count) taosReadImp(fd, buf, count) #define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count) diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 880063df7b359a73681a87bde6b73c0e1f6934c8..73806143826026df7c0e10fec4c66a15875e7cb4 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -119,11 +119,11 @@ int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { return (int64_t)tlseek(fd, (long)offset, whence); } -ssize_t taosTCopy(char *from, char *to) { +int64_t taosCopy(char *from, char *to) { char buffer[4096]; int fidto = -1, fidfrom = -1; - ssize_t size = 0; - ssize_t bytes; + int64_t size = 0; + int64_t bytes; fidfrom = open(from, O_RDONLY); if (fidfrom < 0) goto _err; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 637b02cd32ae8ad8e4077684609dcac23922d8a0..04ea90e299602419e25aadcea00db12bf7eb9d32 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -21,6 +21,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); +static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key); void *tsdbCommitData(STsdbRepo *pRepo) { SMemTable * pMem = pRepo->imem; @@ -42,8 +43,6 @@ void *tsdbCommitData(STsdbRepo *pRepo) { goto _err; } - tsdbFitRetention(pRepo); - tsdbInfo("vgId:%d commit over, succeed", REPO_ID(pRepo)); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); @@ -65,9 +64,16 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { SCommitIter *iters = NULL; SRWHelper whelper = {0}; STsdbCfg * pCfg = &(pRepo->config); + SFidGroup fidGroup = {0}; + TSKEY minKey = 0; + TSKEY maxKey = 0; if (pMem->numOfRows <= 0) return 0; + tsdbGetFidGroup(pCfg, &fidGroup); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fidGroup.minFid, &minKey, &maxKey); + tsdbRemoveFilesBeyondRetention(pRepo, &fidGroup); + iters = tsdbCreateCommitIters(pRepo); if (iters == NULL) { tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -89,14 +95,20 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + tsdbSeekCommitIter(iters, pMem->maxTables, minKey); + // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { + if (fid < fidGroup.minFid) continue; + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _err; } } + tsdbApplyRetention(pRepo, &fidGroup); + tdFreeDataCols(pDataCols); tsdbDestroyCommitIters(iters, pMem->maxTables); tsdbDestroyHelper(&whelper); @@ -173,7 +185,6 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { - char * dataDir = NULL; STsdbCfg * pCfg = &pRepo->config; STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pGroup = NULL; @@ -190,15 +201,17 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe return 0; } - // Create and open files for commit - dataDir = tsdbGetDataDirName(pRepo->rootDir); - if (dataDir == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; + if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) { + pGroup = tsdbCreateFGroup(pRepo, fid); + if (pGroup == NULL) { + tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + return -1; + } } - if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { - tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + // Open files for write/read + if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { + tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } @@ -259,7 +272,6 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe goto _err; } - tfree(dataDir); tsdbCloseHelperFile(pHelper, 0, pGroup); pthread_rwlock_wrlock(&(pFileH->fhlock)); @@ -281,7 +293,6 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe return 0; _err: - tfree(dataDir); tsdbCloseHelperFile(pHelper, 1, pGroup); return -1; } @@ -338,3 +349,13 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { free(iters); } + +static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) { + for (int i = 0; i < nIters; i++) { + SCommitIter *pIter = pIters + i; + if (pIter->pTable == NULL) continue; + if (pIter->pIter == NULL) continue; + + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0, true, NULL); + } +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index c89c66d50b751032537742cd1000bc78fbead2eb..1bfa09fc11343436e46aebb1c5e879e21f0fb828 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -476,7 +476,7 @@ int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { } for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - if (taosTCopy(oFileGroup.files[type].fname, nFileGroup.files[type].fname) < 0) return -1; + if (taosCopy(oFileGroup.files[type].fname, nFileGroup.files[type].fname) < 0) return -1; } pthread_rwlock_wrlock(&(pFileH->fhlock)); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 71944c87c6d68d530e07a13a05bf5ac89ee3754d..adb33bd23bbdb7444ede35f8c6b293bb8f5779a9 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -262,6 +262,9 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey bool isRowDel = false; int filterIter = 0; SDataRow row = NULL; + SMergeInfo mInfo; + + if (pMergeInfo == NULL) pMergeInfo = &mInfo; memset(pMergeInfo, 0, sizeof(*pMergeInfo)); pMergeInfo->keyFirst = INT64_MAX; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index dacc18c4114f485a98506a0aa8c970c050fb4dab..fd9c07e7d81ca5ef26cfc356286d053c0dd27648 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -21,8 +21,13 @@ #include "trpc.h" #include "tsdb.h" #include "tutil.h" +#include "dnode.h" #include "vnode.h" #include "vnodeInt.h" +#include "vnodeCfg.h" +#include "vnodeVersion.h" +#include "dnodeVWrite.h" +#include "dnodeVRead.h" #include "query.h" #include "tpath.h" #include "tdisk.h"