From ed7d5fe85019cd8c0d201af31356ac8fc8a637f5 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sun, 22 Mar 2020 22:35:10 +0800 Subject: [PATCH] TD-34 --- src/vnode/tsdb/inc/tsdb.h | 2 ++ src/vnode/tsdb/inc/tsdbCache.h | 5 ++- src/vnode/tsdb/src/tsdbCache.c | 41 +++++++++++++---------- src/vnode/tsdb/src/tsdbMain.c | 52 +++++++++++++++++++++++++----- src/vnode/tsdb/tests/tsdbTests.cpp | 24 +++++++++++--- 5 files changed, 94 insertions(+), 30 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 4964ac673f..1368515cfd 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -59,6 +59,8 @@ tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbTriggerCommit(tsdb_repo_t *repo); +int32_t tsdbLockRepo(tsdb_repo_t *repo); +int32_t tsdbUnLockRepo(tsdb_repo_t *repo); // --------- TSDB TABLE DEFINITION typedef struct { diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 2676829c75..3e9eabc90d 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -19,6 +19,7 @@ #include "taosdef.h" #include "tlist.h" +#include "tsdb.h" #ifdef __cplusplus extern "C" { @@ -49,13 +50,15 @@ typedef struct { typedef struct { int maxBytes; int cacheBlockSize; + int totalCacheBlocks; STsdbCachePool pool; STsdbCacheBlock *curBlock; SCacheMem * mem; SCacheMem * imem; + tsdb_repo_t * pRepo; } STsdbCache; -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo); void tsdbFreeCache(STsdbCache *pCache); void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key); diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 3b1d44e6c7..f51c7c12d4 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -14,12 +14,13 @@ */ #include +#include "tsdb.h" #include "tsdbCache.h" static int tsdbAllocBlockFromPool(STsdbCache *pCache); static void tsdbFreeBlockList(SCacheMem *mem); -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); if (pCache == NULL) return NULL; @@ -27,9 +28,11 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { pCache->maxBytes = maxBytes; pCache->cacheBlockSize = cacheBlockSize; + pCache->pRepo = pRepo; int nBlocks = maxBytes / cacheBlockSize + 1; if (nBlocks <= 1) nBlocks = 2; + pCache->totalCacheBlocks = nBlocks; STsdbCachePool *pPool = &(pCache->pool); pPool->index = 0; @@ -67,22 +70,10 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { if (pCache == NULL) return NULL; if (bytes > pCache->cacheBlockSize) return NULL; - if (pCache->mem == NULL) { // Create a new one - pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); - if (pCache->mem == NULL) return NULL; - pCache->mem->keyFirst = INT64_MAX; - pCache->mem->keyLast = 0; - pCache->mem->numOfPoints = 0; - pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); - } - - if (isListEmpty(pCache->mem->list)) { - if (tsdbAllocBlockFromPool(pCache) < 0) { - // TODO: deal with the error + if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) { + if (pCache->curBlock !=NULL && (pCache->mem->list) >= pCache->totalCacheBlocks/2) { + tsdbTriggerCommit(pCache->pRepo); } - } - - if (pCache->curBlock->remain < bytes) { if (tsdbAllocBlockFromPool(pCache) < 0) { // TODO: deal with the error } @@ -115,7 +106,12 @@ static void tsdbFreeBlockList(SCacheMem *mem) { static int tsdbAllocBlockFromPool(STsdbCache *pCache) { STsdbCachePool *pPool = &(pCache->pool); - if (listNEles(pPool->memPool) == 0) return -1; + + tsdbLockRepo(pCache->pRepo); + if (listNEles(pPool->memPool) == 0) { + tsdbUnLockRepo(pCache->pRepo); + return -1; + } SListNode *node = tdListPopHead(pPool->memPool); @@ -125,8 +121,19 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { pBlock->offset = 0; pBlock->remain = pCache->cacheBlockSize; + if (pCache->mem == NULL) { // Create a new one + pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); + if (pCache->mem == NULL) return NULL; + pCache->mem->keyFirst = INT64_MAX; + pCache->mem->keyLast = 0; + pCache->mem->numOfPoints = 0; + pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); + } + tdListAppendNode(pCache->mem->list, node); pCache->curBlock = pBlock; + tsdbUnLockRepo(pCache->pRepo); + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 7da80cb50f..a8a80dd164 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -150,6 +150,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->rootDir = strdup(rootDir); pRepo->config = *pCfg; pRepo->limiter = limiter; + pthread_mutex_init(&pRepo->mutex, NULL); // Create the environment files and directories if (tsdbSetRepoEnv(pRepo) < 0) { @@ -168,7 +169,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->tsdbMeta = pMeta; // Initialize cache - STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1); + STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo); if (pCache == NULL) { free(pRepo->rootDir); tsdbFreeMeta(pRepo->tsdbMeta); @@ -249,7 +250,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } - pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1); + pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1, (tsdb_repo_t *)pRepo); if (pRepo->tsdbCache == NULL) { tsdbFreeMeta(pRepo->tsdbMeta); free(pRepo->rootDir); @@ -305,9 +306,12 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) { int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; - - if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; - if (pRepo->commit) return 0; + + tsdbLockRepo(repo); + if (pRepo->commit) { + tsdbUnLockRepo(repo); + return -1; + } pRepo->commit = 1; // Loop to move pData to iData for (int i = 0; i < pRepo->config.maxTables; i++) { @@ -320,15 +324,25 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { // TODO: Loop to move mem to imem pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->mem = NULL; + pRepo->tsdbCache->curBlock = NULL; + // TODO: here should set as detached or use join for memory leak pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); - pthread_mutex_unlock(&(pRepo->mutex)); - - pthread_join(pRepo->commitThread, NULL); + tsdbUnLockRepo(repo); return 0; } +int32_t tsdbLockRepo(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + return pthread_mutex_lock(repo); +} + +int32_t tsdbUnLockRepo(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + return pthread_mutex_unlock(repo); +} + /** * Get the TSDB repository information, including some statistics * @param pRepo the TSDB repository handle @@ -691,6 +705,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable dataRowCpy(SL_GET_NODE_DATA(pNode), row); // Insert the skiplist node into the data + if (pTable->mem == NULL) { + pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); + if (pTable->mem == NULL) return -1; + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->keyFirst = INT64_MAX; + pTable->mem->keyLast = 0; + } tSkipListPut(pTable->mem->pData, pNode); if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; @@ -788,6 +809,7 @@ static void *tsdbCommitToFile(void *arg) { int rowsRead = 0; while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > 0) { + // printf("rowsRead:%d-----------\n", rowsRead); int k = 0; } } @@ -802,5 +824,19 @@ static void *tsdbCommitToFile(void *arg) { free(cols); free(iters); + tsdbLockRepo(arg); + tdListMove(pCache->imem->list, pCache->pool.memPool); + free(pCache->imem); + pCache->imem = NULL; + pRepo->commit = 0; + // TODO: free the skiplist + for (int i = 0; i < pCfg->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->imem) { // Here has memory leak + pTable->imem = NULL; + } + } + tsdbUnLockRepo(arg); + return NULL; } \ No newline at end of file diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 8895258b1a..459d531c10 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -1,11 +1,18 @@ #include #include +#include #include "tsdb.h" #include "dataformat.h" #include "tsdbFile.h" #include "tsdbMeta.h" +double getCurTime() { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec + tv.tv_usec * 1E-6; +} + TEST(TsdbTest, DISABLED_tableEncodeDecode) { STable *pTable = (STable *)malloc(sizeof(STable)); @@ -71,19 +78,22 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 1000; - int rowsPerSubmit = 10; + int nRows = 10000000; + int rowsPerSubmit = 100; int64_t start_time = 1584081000000; SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); + double stime = getCurTime(); + for (int k = 0; k < nRows/rowsPerSubmit; k++) { SSubmitBlk *pBlock = pMsg->blocks; pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; pBlock->sversion = 0; pBlock->len = 0; for (int i = 0; i < rowsPerSubmit; i++) { - start_time += 1000; + // start_time += 1000; + start_time -= 1000; SDataRow row = (SDataRow)(pBlock->data + pBlock->len); tdInitDataRow(row, schema); @@ -102,7 +112,13 @@ TEST(TsdbTest, createRepo) { tsdbInsertData(pRepo, pMsg); } - tsdbTriggerCommit(pRepo); + double etime = getCurTime(); + + printf("Spent %f seconds to write %d records\n", etime - stime, nRows); + + + + // tsdbTriggerCommit(pRepo); } -- GitLab