diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index a5bfb0fa1ebd5f43619e18b7d31a0d77545c6773..b7894fd38622a8fee83d19d84d628381eb057904 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -192,6 +192,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP, 0, 0x060C, "tsdb submi TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb invalid action") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index aaa84884a197b5518dfa7ed9f39a6b679838b700..c26beba21ced392eeb2ffc6489d40d90a6488eb5 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -116,8 +116,7 @@ typedef enum { TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, - TSDB_FILE_TYPE_NHEAD, - TSDB_FILE_TYPE_NLAST + TSDB_FILE_TYPE_MAX, } TSDB_FILE_TYPE; typedef struct { @@ -137,10 +136,8 @@ typedef struct { } SFile; typedef struct { - int fileId; - SFile headF; - SFile dataF; - SFile lastF; + int fileId; + SFile files[TSDB_FILE_TYPE_MAX]; } SFileGroup; typedef struct { @@ -313,11 +310,23 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) #define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId #define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId +#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0) #define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); void tsdbFreeFileH(STsdbFileH* pFileH); +int* tsdbOpenFileH(STsdbRepo* pRepo); +void tsdbCloseFileH(STsdbRepo* pRepo); +SFileGroup* tsdbCreateFGroupIfNeed(STsdbFileH* pFileH, char* dataDir, int fid, int maxTables); +void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); +void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); +SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); +int tsdbOpenFile(SFile* pFile, int oflag); +void tsdbCloseFile(SFile* pFile); +int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); +SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); +void tsdbFitRetention(STsdbRepo* pRepo); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state @@ -349,7 +358,6 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); // --------- Helper state - int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo); int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo); void tsdbDestroyHelper(SRWHelper *pHelper); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 59bc2f53bbf38ca5061f951b902bc77c70104323..25f8e032ed5504c68e16faea1b13ce24bba91708 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -29,7 +29,13 @@ #include "tutil.h" #include "ttime.h" -const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".h", ".l"}; +const char *tsdbFileSuffix[] = {".head", ".data", ".last"}; + +static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); +static void tsdbDestroyFile(SFile *pFile); +static int compFGroup(const void *arg1, const void *arg2); +static int keyFGroupCompFunc(const void *key, const void *fgroup); +static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -39,7 +45,7 @@ STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { goto _err; } - int code = pthread_rwlock_init(&(pFileH->fhlock)); + int code = pthread_rwlock_init(&(pFileH->fhlock), NULL); if (code != 0) { tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code)); terrno = TAOS_SYSTEM_ERROR(code); @@ -76,6 +82,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) { DIR * dir = NULL; int fid = 0; + SFileGroup fileGroup = {0}; STsdbFileH pFileH = pRepo->tsdbFileH; tDataDir = tsdbGetDataDirName(pRepo->rootDir); @@ -95,21 +102,22 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) { while ((dp = readdir(dir)) != NULL) { if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; sscanf(dp->d_name, "f%d", &fid); - - SFileGroup fileGroup = {0}; - - if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue; - fileGroup.fileId = fid; - for (int type = TSDB_FILE_TYPE_HEAD; type <= TSDB_FILE_TYPE_LAST; type++) { - fileGroup.headF.fname = tsdbGetDataFileName(pRepo, fid, type); - if (fileGroup.headF.fname == NULL) goto _err; - if (tsdbInitFile(fileGroup.headF)) + if (tsdbSearchFGroup(pRepo->tsdbFileH, fid, TD_EQ) != NULL) return 0; + fileGroup = {0}; + fileGroup.fileId = fid; + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) { + tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type); + goto _err; + } } - for (int type = TSDB_FILE_TYPE_NHEAD; type <= TSDB_FILE_TYPE_NLAST; type++) { - } + tsdbTrace("vgId:%d file group %d init", REPO_ID(pRepo), fid); + + pFileH->[pFileH->nFGroups++] = fileGroup; + qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); } tfree(tDataDir); @@ -117,8 +125,11 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) { return 0; _err: + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); + tfree(tDataDir); if (dir != NULL) closedir(tDataDir); + tsdbCloseFileH(pRepo); return -1; } @@ -126,17 +137,14 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { STsdbFileH *pFileH = pRepo->tsdbFileH; for (int i = 0; i < pFileH->nFGroups; i++) { - // TODO - + SFileGroup *pFGroup = pFileH->pFGroup + i; + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbDestroyFile(pFGroup->files[type]); + } } } -/** - * Create the file group if the file group not exists. - * - * @return A pointer to - */ -SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { +SFileGroup *tsdbCreateFGroupIfNeed(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL; SFileGroup fGroup; @@ -158,35 +166,15 @@ SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int max return pGroup; _err: - // TODO: deal with the err here + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); return NULL; } -int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { - SFileGroup *pGroup = - bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc); - if (pGroup == NULL) return -1; - - // Remove from disk - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - remove(pGroup->files[type].fname); - } - - // Adjust the memory - int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1); - if (filesBehind > 0) { - memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind); - } - pFileH->numOfFGroups--; - - return 0; -} - -void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO pIter->direction = direction; pIter->base = pFileH->fGroup; pIter->numOfFGroups = pFileH->numOfFGroups; - if (pFileH->numOfFGroups == 0){ + if (pFileH->numOfFGroups == 0) { pIter->pFileGroup = NULL; } else { if (direction == TSDB_FGROUP_ITER_FORWARD) { @@ -197,25 +185,13 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct } } -void tsdbFitRetention(STsdbRepo *pRepo) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup *pGroup = pFileH->fGroup; - - int mfid = - tsdbGetKeyFileId(taosGetTimestamp(pRepo->config.precision), pRepo->config.daysPerFile, pRepo->config.precision) - pFileH->maxFGroups + 3; - - while (pFileH->numOfFGroups > 0 && pGroup[0].fileId < mfid) { - tsdbRemoveFileGroup(pFileH, pGroup[0].fileId); - } -} - -void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { // TODO if (pIter->numOfFGroups == 0) { assert(pIter->pFileGroup == NULL); return; } - - int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; + + int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); if (ptr == NULL) { pIter->pFileGroup = NULL; @@ -224,7 +200,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { } } -SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {//TODO SFileGroup *ret = pIter->pFileGroup; if (ret == NULL) return NULL; @@ -264,20 +240,21 @@ void tsdbCloseFile(SFile *pFile) { } } -int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) { +int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { memset((void *)pFile, 0, sizeof(SFile)); pFile->fd = -1; - tsdbGetFileName(dataDir, fileId, suffix, pFile->fname); - + pFile->fname = tsdbGetDataFileName(pRepo, fid, type); + if (pFile->fname == NULL) return -1; + if (access(pFile->fname, F_OK) == 0) { - // File already exists - return -1; + tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), fid); + terrno = TSDB_CODE_TDB_FILE_ALREADY_EXISTS; + goto _err; } if (tsdbOpenFile(pFile, O_RDWR | O_CREAT) < 0) { - // TODO: deal with the ERROR here - return -1; + goto _err; } pFile->info.size = TSDB_FILE_HEAD_SIZE; @@ -290,6 +267,9 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) tsdbCloseFile(pFile); return 0; + +_err: + return -1; } SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { @@ -299,18 +279,46 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { return (SFileGroup *)ptr; } +void tsdbFitRetention(STsdbRepo *pRepo) { + STsdbCfg *pCfg = &(pRepo->config); + STsdbFileH *pFileH = pRepo->tsdbFileH; + SFileGroup *pGroup = pFileH->pFGroup; + + int mfid = TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) - + TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile); + + pthread_rwlock_wrlock(&(pFileH->fhlock)); + + while (pFileH->numOfFGroups > 0 && pGroup[0].fileId < mfid) { + tsdbRemoveFileGroup(pFileH, pGroup); + } + + pthread_rwlock_unlock(&(pFileH->fhlock)) +} + + // ---------------- LOCAL FUNCTIONS ---------------- -static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) { +static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { uint32_t version; - char buf[512] = "\0"; + char buf[512] = "\0"; + + pFile->fname = tsdbGetDataFileName(pRepo, fid, type); + if (pFile->fname == NULL) return -1; - tsdbGetFileName(dataDir, fid, suffix, pFile->fname); - if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; pFile->fd = -1; - if (tsdbOpenFile(pFile, O_RDONLY) < 0) return -1; + if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; - if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1; - if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1; + if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILE_HEAD_SIZE, + pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { + tsdbError("vgId:%d file %s head part is corrupted", REPO_ID(pRepo), pFile->fname); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } void *pBuf = buf; pBuf = taosDecodeFixedU32(pBuf, &version); @@ -319,21 +327,15 @@ static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile tsdbCloseFile(pFile); return 0; +_err: + tsdbDestroyFile(pFile); + return -1; } -// static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { -// if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0; - -// SFileGroup fGroup = {0}; -// fGroup.fileId = fid; - -// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { -// if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1; -// } -// pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; -// qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); -// return 0; -// } +static void tsdbDestroyFile(SFile *pFile) { + tsdbCloseFile(pFile); + tfree(pFile->fname); +} static int compFGroup(const void *arg1, const void *arg2) { int val1 = ((SFileGroup *)arg1)->fileId; @@ -356,4 +358,24 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) { } else { return fid > pFGroup->fileId ? 1 : -1; } +} + +static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { + ASSERT(pFGroup != NULL); + STsdbFileH *pFileH = pRepo->tsdbFileH; + + SFileGroup fileGroup = *pFGroup; + + int nFilesLeft = pFileH->nFGroups - (POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); + if (nFilesLeft > 0) { + memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft); + } + + pFileH->nFGroups--; + ASSERT(pFileH->nFGroups >= 0); + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + remove(fileGroup.files[type].fname); + tsdbDestroyFile(&fileGroup.files[type]); + } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 75bfe0d7ce7647fdecf2ab268e65cbf4fe905ed4..0c8bca9441e07f133804ab85589d1a6cda96b6ba 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -464,7 +464,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe return -1; } - if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) { + if ((pGroup = tsdbCreateFGroupIfNeed(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) { tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _err; }