提交 76b7e3c8 编写于 作者: H Hongze Cheng

TD-353

上级 a60fc0f7
...@@ -346,6 +346,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); ...@@ -346,6 +346,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
int tsdbCpySFile(SFile* src, SFile* dst); int tsdbCpySFile(SFile* src, SFile* dst);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
......
...@@ -35,7 +35,6 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); ...@@ -35,7 +35,6 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile); static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2); static int compFGroup(const void *arg1, const void *arg2);
static int keyFGroupCompFunc(const void *key, const void *fgroup); static int keyFGroupCompFunc(const void *key, const void *fgroup);
static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
...@@ -356,6 +355,26 @@ int tsdbCpySFile(SFile *src, SFile *dst) { ...@@ -356,6 +355,26 @@ int tsdbCpySFile(SFile *src, SFile *dst) {
return 0; return 0;
} }
void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
ASSERT(pFGroup != NULL);
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup fileGroup = *pFGroup;
int nFilesLeft = pFileH->nFGroups - (POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
if (nFilesLeft > 0) {
memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
}
pFileH->nFGroups--;
ASSERT(pFileH->nFGroups >= 0);
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
remove(fileGroup.files[type].fname);
tsdbDestroyFile(&fileGroup.files[type]);
}
}
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version; uint32_t version;
...@@ -418,23 +437,3 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) { ...@@ -418,23 +437,3 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
return fid > pFGroup->fileId ? 1 : -1; return fid > pFGroup->fileId ? 1 : -1;
} }
} }
static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
ASSERT(pFGroup != NULL);
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup fileGroup = *pFGroup;
int nFilesLeft = pFileH->nFGroups - (POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
if (nFilesLeft > 0) {
memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
}
pFileH->nFGroups--;
ASSERT(pFileH->nFGroups >= 0);
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
remove(fileGroup.files[type].fname);
tsdbDestroyFile(&fileGroup.files[type]);
}
}
\ No newline at end of file
...@@ -61,7 +61,7 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); ...@@ -61,7 +61,7 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbRestoreInfo(STsdbRepo *pRepo); static int tsdbRestoreInfo(STsdbRepo *pRepo);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup); static int keyFGroupCompFunc(const void *key, const void *fgroup);
...@@ -270,10 +270,11 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { ...@@ -270,10 +270,11 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) {
} }
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg * pRCfg = &pRepo->config; STsdbCfg * pRCfg = &pRepo->config;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_TDB_INVALID_CONFIG; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId); ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
...@@ -284,25 +285,33 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { ...@@ -284,25 +285,33 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
bool configChanged = false; bool configChanged = false;
if (pRCfg->compression != pCfg->compression) { if (pRCfg->compression != pCfg->compression) {
configChanged = true;
tsdbAlterCompression(pRepo, pCfg->compression); tsdbAlterCompression(pRepo, pCfg->compression);
configChanged = true;
} }
if (pRCfg->keep != pCfg->keep) { if (pRCfg->keep != pCfg->keep) {
if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) {
tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
configChanged = true; configChanged = true;
tsdbAlterKeep(pRepo, pCfg->keep);
} }
if (pRCfg->totalBlocks != pCfg->totalBlocks) { if (pRCfg->totalBlocks != pCfg->totalBlocks) {
configChanged = true;
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
configChanged = true;
} }
if (pRCfg->maxTables != pCfg->maxTables) { if (pRCfg->maxTables != pCfg->maxTables) {
configChanged = true;
tsdbAlterMaxTables(pRepo, pCfg->maxTables); tsdbAlterMaxTables(pRepo, pCfg->maxTables);
configChanged = true;
} }
if (configChanged) tsdbSaveConfig(pRepo->rootDir, &pRepo->config); if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) {
tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
return TSDB_CODE_SUCCESS; return 0;
} }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
...@@ -351,6 +360,7 @@ int tsdbLockRepo(STsdbRepo *pRepo) { ...@@ -351,6 +360,7 @@ int tsdbLockRepo(STsdbRepo *pRepo) {
} }
int tsdbUnlockRepo(STsdbRepo *pRepo) { int tsdbUnlockRepo(STsdbRepo *pRepo) {
ASSERT(IS_REPO_LOCKED(pRepo));
pRepo->repoLocked = false; pRepo->repoLocked = false;
int code = pthread_mutex_unlock(&pRepo->mutex); int code = pthread_mutex_unlock(&pRepo->mutex);
if (code != 0) { if (code != 0) {
...@@ -832,31 +842,57 @@ static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { ...@@ -832,31 +842,57 @@ static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
} }
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
int8_t oldCompRession = pRepo->config.compression; int8_t ocompression = pRepo->config.compression;
pRepo->config.compression = compression; pRepo->config.compression = compression;
tsdbTrace("vgId:%d tsdb compression is changed from %d to %d", oldCompRession, compression); tsdbTrace("vgId:%d tsdb compression is changed from %d to %d", ocompression, compression);
} }
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
STsdbCfg *pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
int oldKeep = pCfg->keep; STsdbFileH *pFileH = pRepo->tsdbFileH;
int okeep = pCfg->keep;
SFileGroup *pFGroup = NULL;
ASSERT(pCfg->keep != keep);
int maxFiles = TSDB_MAX_FILE(keep, pCfg->daysPerFile);
if (maxFiles != pFileH->maxFGroups) {
pthread_rwlock_wrlock(&(pFileH->fhlock));
pCfg->keep = keep;
pFGroup = (SFileGroup *)calloc(maxFiles, sizeof(SFileGroup));
if (pFGroup == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
pthread_rwlock_unlock(&(pFileH->fhlock));
return -1;
}
int maxFiles = keep / pCfg->maxTables + 3; int mfid = TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) -
if (pRepo->config.keep > keep) { TSDB_MAX_FILE(keep, pCfg->daysPerFile);
pRepo->config.keep = keep;
pRepo->tsdbFileH->maxFGroups = maxFiles; int i = 0;
} else { for (; i < pFileH->nFGroups; i++) {
pRepo->config.keep = keep; if (pFileH->pFGroup[i].fileId >= mfid) break;
pRepo->tsdbFileH->pFGroup = realloc(pRepo->tsdbFileH->pFGroup, sizeof(SFileGroup)); tsdbRemoveFileGroup(pRepo, &(pFileH->pFGroup[i]));
if (pRepo->tsdbFileH->pFGroup == NULL) { }
// TODO: deal with the error
for (int j = 0; i < pFileH->nFGroups; i++, j++) {
pFGroup[j] = pFileH->pFGroup[i];
} }
pRepo->tsdbFileH->maxFGroups = maxFiles;
free(pFileH->pFGroup);
pFileH->pFGroup = pFGroup;
pthread_rwlock_unlock(&(pFileH->fhlock));
} }
tsdbTrace("vgId:%d, keep is changed from %d to %d", pRepo->config.tsdbId, oldKeep, keep);
tsdbTrace("vgId:%d keep is changed from %d to %d", REPO_ID(pRepo), okeep, keep);
return 0;
} }
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
// TODO
int oldMaxTables = pRepo->config.maxTables; int oldMaxTables = pRepo->config.maxTables;
if (oldMaxTables < pRepo->config.maxTables) { if (oldMaxTables < pRepo->config.maxTables) {
// TODO // TODO
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册