提交 bf06466c 编写于 作者: H hzcheng

add change repository compression options

上级 86e69f8d
...@@ -173,6 +173,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value") ...@@ -173,6 +173,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value")
// others // others
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format")
// TSDB
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONFIG, 0, 550, "invalid TSDB configuration")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
......
...@@ -160,6 +160,7 @@ typedef struct { ...@@ -160,6 +160,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t index; int64_t index;
int numOfCacheBlocks;
SList * memPool; SList * memPool;
} STsdbCachePool; } STsdbCachePool;
...@@ -488,6 +489,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper); ...@@ -488,6 +489,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper);
// --------- Other functions need to further organize // --------- Other functions need to further organize
void tsdbFitRetention(STsdbRepo *pRepo); void tsdbFitRetention(STsdbRepo *pRepo);
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
void tsdbAdjustCacheBlocks(STsdbCache *pCache);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
static int tsdbAllocBlockFromPool(STsdbCache *pCache); static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list); static void tsdbFreeBlockList(SList *list);
static void tsdbFreeCacheMem(SCacheMem *mem); static void tsdbFreeCacheMem(SCacheMem *mem);
static int tsdbAddCacheBlockToPool(STsdbCache *pCache);
STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) { STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) {
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
...@@ -40,13 +41,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) ...@@ -40,13 +41,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo)
if (pPool->memPool == NULL) goto _err; if (pPool->memPool == NULL) goto _err;
for (int i = 0; i < totalBlocks; i++) { for (int i = 0; i < totalBlocks; i++) {
STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); if (tsdbAddCacheBlockToPool(pCache) < 0) goto _err;
if (pBlock == NULL) {
goto _err;
}
pBlock->offset = 0;
pBlock->remain = cacheBlockSize;
tdListAppend(pPool->memPool, (void *)(&pBlock));
} }
pCache->mem = NULL; pCache->mem = NULL;
...@@ -143,3 +138,69 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { ...@@ -143,3 +138,69 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
return 0; return 0;
} }
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
STsdbCache *pCache = pRepo->tsdbCache;
int oldNumOfBlocks = pCache->totalCacheBlocks;
tsdbLockRepo((TsdbRepoT *)pRepo);
ASSERT(pCache->totalCacheBlocks != totalBlocks);
if (pCache->totalCacheBlocks < totalBlocks) {
ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks);
int blocksToAdd = pCache->totalCacheBlocks - totalBlocks;
pCache->totalCacheBlocks = totalBlocks;
for (int i = 0; i < blocksToAdd; i++) {
if (tsdbAddCacheBlockToPool(pCache) < 0) {
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbError("tsdbId %d: failed to add cache block to cache pool", pRepo->config.tsdbId);
return -1;
}
}
} else {
pCache->totalCacheBlocks = totalBlocks;
tsdbAdjustCacheBlocks(pCache);
}
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbTrace("tsdbId %d: tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks);
return 0;
}
static int tsdbAddCacheBlockToPool(STsdbCache *pCache) {
STsdbCachePool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize);
if (pBlock == NULL) return -1;
pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize;
tdListAppend(pPool->memPool, (void *)(&pBlock));
pPool->numOfCacheBlocks++;
return 0;
}
static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) {
STsdbCachePool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = NULL;
ASSERT(pCache->totalCacheBlocks >= 0);
SListNode *node = tdListPopHead(pPool->memPool);
if (node == NULL) return -1;
tdListNodeGetData(pPool->memPool, node, &pBlock);
free(pBlock);
listNodeFree(node);
pPool->numOfCacheBlocks--;
return 0;
}
void tsdbAdjustCacheBlocks(STsdbCache *pCache) {
while (pCache->totalCacheBlocks < pCache->pool.numOfCacheBlocks) {
if (tsdbRemoveCacheBlockFromPool(pCache) < 0) break;
}
}
\ No newline at end of file
...@@ -47,7 +47,7 @@ STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) { ...@@ -47,7 +47,7 @@ STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) {
return NULL; return NULL;
} }
pFileH->maxFGroups = pCfg->keep / pCfg->daysPerFile + 2; pFileH->maxFGroups = pCfg->keep / pCfg->daysPerFile + 3;
pFileH->fGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); pFileH->fGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup));
if (pFileH->fGroup == NULL) { if (pFileH->fGroup == NULL) {
......
...@@ -36,8 +36,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **i ...@@ -36,8 +36,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **i
SDataCols *pDataCols); SDataCols *pDataCols);
static TSKEY tsdbNextIterKey(SSkipListIterator *pIter); static TSKEY tsdbNextIterKey(SSkipListIterator *pIter);
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey);
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
// int64_t uid); static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...@@ -298,10 +299,23 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { ...@@ -298,10 +299,23 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
*/ */
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) { int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg * pRCfg = &pRepo->config;
pRepo->config = *pCfg; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_INVALID_CONFIG;
// TODO
return 0; ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
if (pRCfg->compression != pCfg->compression) tsdbAlterCompression(pRepo, pCfg->compression);
if (pRCfg->keep != pCfg->keep) tsdbAlterKeep(pRepo, pCfg->keep);
if (pRCfg->totalBlocks != pCfg->totalBlocks) tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
if (pRCfg->maxTables != pCfg->maxTables) tsdbAlterMaxTables(pRepo, pCfg->maxTables);
return TSDB_CODE_SUCCESS;
} }
int32_t tsdbTriggerCommit(TsdbRepoT *repo) { int32_t tsdbTriggerCommit(TsdbRepoT *repo) {
...@@ -927,6 +941,7 @@ _exit: ...@@ -927,6 +941,7 @@ _exit:
tsdbLockRepo(arg); tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool); tdListMove(pCache->imem->list, pCache->pool.memPool);
tsdbAdjustCacheBlocks(pCache);
tdListFree(pCache->imem->list); tdListFree(pCache->imem->list);
free(pCache->imem); free(pCache->imem);
pCache->imem = NULL; pCache->imem = NULL;
...@@ -1046,3 +1061,26 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK ...@@ -1046,3 +1061,26 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK
} }
return 0; return 0;
} }
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
pRepo->config.compression = compression;
}
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
STsdbCfg *pCfg = &pRepo->config;
int maxFiles = keep / pCfg->maxTables + 3;
if (pRepo->config.keep > keep) {
pRepo->tsdbFileH->maxFGroups = maxFiles;
} else {
pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup));
if (pRepo->tsdbFileH->fGroup == NULL) {
// TODO: deal with the error
}
pRepo->tsdbFileH->maxFGroups = maxFiles;
}
}
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
// TODO
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册