diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 0a55b9050c2b25c3a7a2aefd716652bb8d23bc95..2238aa7fc87231a48af404c998c234336c090e20 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -346,6 +346,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); int tsdbCpySFile(SFile* src, SFile* dst); +void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index e268aa1d6548c362abc33efbe775016e70b63484..4c5681de9341dedec9cd04413b6643f6a78cfa8b 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -35,7 +35,6 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static void tsdbDestroyFile(SFile *pFile); static int compFGroup(const void *arg1, const void *arg2); static int keyFGroupCompFunc(const void *key, const void *fgroup); -static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -356,6 +355,26 @@ int tsdbCpySFile(SFile *src, SFile *dst) { return 0; } +void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { + ASSERT(pFGroup != NULL); + STsdbFileH *pFileH = pRepo->tsdbFileH; + + SFileGroup fileGroup = *pFGroup; + + int nFilesLeft = pFileH->nFGroups - (POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); + if (nFilesLeft > 0) { + memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft); + } + + pFileH->nFGroups--; + ASSERT(pFileH->nFGroups >= 0); + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + remove(fileGroup.files[type].fname); + tsdbDestroyFile(&fileGroup.files[type]); + } +} + // ---------------- LOCAL FUNCTIONS ---------------- static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { uint32_t version; @@ -418,23 +437,3 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) { return fid > pFGroup->fileId ? 1 : -1; } } - -static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { - ASSERT(pFGroup != NULL); - STsdbFileH *pFileH = pRepo->tsdbFileH; - - SFileGroup fileGroup = *pFGroup; - - int nFilesLeft = pFileH->nFGroups - (POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); - if (nFilesLeft > 0) { - memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft); - } - - pFileH->nFGroups--; - ASSERT(pFileH->nFGroups >= 0); - - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - remove(fileGroup.files[type].fname); - tsdbDestroyFile(&fileGroup.files[type]); - } -} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index faf69d87d3b2c071909eeba586be854ffe31e8dc..75f9be67712fdebd2689a04fa3524f9232b4a225 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -61,7 +61,7 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbRestoreInfo(STsdbRepo *pRepo); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); -static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); +static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); static int keyFGroupCompFunc(const void *key, const void *fgroup); @@ -270,10 +270,11 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { } int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { + // TODO: think about multithread cases STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbCfg * pRCfg = &pRepo->config; - if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_TDB_INVALID_CONFIG; + if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; ASSERT(pRCfg->tsdbId == pCfg->tsdbId); ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); @@ -284,25 +285,33 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { bool configChanged = false; if (pRCfg->compression != pCfg->compression) { - configChanged = true; tsdbAlterCompression(pRepo, pCfg->compression); + configChanged = true; } if (pRCfg->keep != pCfg->keep) { + if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) { + tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } configChanged = true; - tsdbAlterKeep(pRepo, pCfg->keep); } if (pRCfg->totalBlocks != pCfg->totalBlocks) { - configChanged = true; tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); + configChanged = true; } if (pRCfg->maxTables != pCfg->maxTables) { - configChanged = true; tsdbAlterMaxTables(pRepo, pCfg->maxTables); + configChanged = true; } - if (configChanged) tsdbSaveConfig(pRepo->rootDir, &pRepo->config); + if (configChanged) { + if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) { + tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + } - return TSDB_CODE_SUCCESS; + return 0; } void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { @@ -351,6 +360,7 @@ int tsdbLockRepo(STsdbRepo *pRepo) { } int tsdbUnlockRepo(STsdbRepo *pRepo) { + ASSERT(IS_REPO_LOCKED(pRepo)); pRepo->repoLocked = false; int code = pthread_mutex_unlock(&pRepo->mutex); if (code != 0) { @@ -832,31 +842,57 @@ static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { } static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { - int8_t oldCompRession = pRepo->config.compression; + int8_t ocompression = pRepo->config.compression; pRepo->config.compression = compression; - tsdbTrace("vgId:%d tsdb compression is changed from %d to %d", oldCompRession, compression); + tsdbTrace("vgId:%d tsdb compression is changed from %d to %d", ocompression, compression); } -static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { - STsdbCfg *pCfg = &pRepo->config; - int oldKeep = pCfg->keep; +static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { + STsdbCfg * pCfg = &pRepo->config; + STsdbFileH *pFileH = pRepo->tsdbFileH; + int okeep = pCfg->keep; + SFileGroup *pFGroup = NULL; + + ASSERT(pCfg->keep != keep); + int maxFiles = TSDB_MAX_FILE(keep, pCfg->daysPerFile); + + if (maxFiles != pFileH->maxFGroups) { + pthread_rwlock_wrlock(&(pFileH->fhlock)); + + pCfg->keep = keep; + pFGroup = (SFileGroup *)calloc(maxFiles, sizeof(SFileGroup)); + if (pFGroup == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + pthread_rwlock_unlock(&(pFileH->fhlock)); + return -1; + } - int maxFiles = keep / pCfg->maxTables + 3; - if (pRepo->config.keep > keep) { - pRepo->config.keep = keep; - pRepo->tsdbFileH->maxFGroups = maxFiles; - } else { - pRepo->config.keep = keep; - pRepo->tsdbFileH->pFGroup = realloc(pRepo->tsdbFileH->pFGroup, sizeof(SFileGroup)); - if (pRepo->tsdbFileH->pFGroup == NULL) { - // TODO: deal with the error + int mfid = TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) - + TSDB_MAX_FILE(keep, pCfg->daysPerFile); + + int i = 0; + for (; i < pFileH->nFGroups; i++) { + if (pFileH->pFGroup[i].fileId >= mfid) break; + tsdbRemoveFileGroup(pRepo, &(pFileH->pFGroup[i])); + } + + for (int j = 0; i < pFileH->nFGroups; i++, j++) { + pFGroup[j] = pFileH->pFGroup[i]; } - pRepo->tsdbFileH->maxFGroups = maxFiles; + + free(pFileH->pFGroup); + pFileH->pFGroup = pFGroup; + + pthread_rwlock_unlock(&(pFileH->fhlock)); } - tsdbTrace("vgId:%d, keep is changed from %d to %d", pRepo->config.tsdbId, oldKeep, keep); + + tsdbTrace("vgId:%d keep is changed from %d to %d", REPO_ID(pRepo), okeep, keep); + + return 0; } static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { + // TODO int oldMaxTables = pRepo->config.maxTables; if (oldMaxTables < pRepo->config.maxTables) { // TODO