diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 9c356b0cbc71671ee8a7d917bf18c0b988f0cb1f..04826e43ac98852495b5951aaa0ba66315bf2480 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -150,7 +150,7 @@ void tdFreeSchema(STSchema *pSchema) { */ void tdUpdateSchema(STSchema *pSchema) { STColumn *pCol = NULL; - int32_t offset = 0; + int32_t offset = TD_DATA_ROW_HEAD_SIZE; for (int i = 0; i < schemaNCols(pSchema); i++) { pCol = schemaColAt(pSchema, i); colSetOffset(pCol, offset); diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 4964ac673f1b25d351d3eb5f0e8e146d510776db..1368515cfdf6b85716173611a45d513e134aa730 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -59,6 +59,8 @@ tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbTriggerCommit(tsdb_repo_t *repo); +int32_t tsdbLockRepo(tsdb_repo_t *repo); +int32_t tsdbUnLockRepo(tsdb_repo_t *repo); // --------- TSDB TABLE DEFINITION typedef struct { diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 2676829c755df47f6586d617818e610b424ef2b5..3e9eabc90d0eb2b177e7da19ef6af892f064c23d 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -19,6 +19,7 @@ #include "taosdef.h" #include "tlist.h" +#include "tsdb.h" #ifdef __cplusplus extern "C" { @@ -49,13 +50,15 @@ typedef struct { typedef struct { int maxBytes; int cacheBlockSize; + int totalCacheBlocks; STsdbCachePool pool; STsdbCacheBlock *curBlock; SCacheMem * mem; SCacheMem * imem; + tsdb_repo_t * pRepo; } STsdbCache; -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo); void tsdbFreeCache(STsdbCache *pCache); void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key); diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 3b1d44e6c7055e78f83a33d0808d6fc2ae0599b6..f51c7c12d4904d9abd695826d327050f399fb50a 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -14,12 +14,13 @@ */ #include +#include "tsdb.h" #include "tsdbCache.h" static int tsdbAllocBlockFromPool(STsdbCache *pCache); static void tsdbFreeBlockList(SCacheMem *mem); -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); if (pCache == NULL) return NULL; @@ -27,9 +28,11 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { pCache->maxBytes = maxBytes; pCache->cacheBlockSize = cacheBlockSize; + pCache->pRepo = pRepo; int nBlocks = maxBytes / cacheBlockSize + 1; if (nBlocks <= 1) nBlocks = 2; + pCache->totalCacheBlocks = nBlocks; STsdbCachePool *pPool = &(pCache->pool); pPool->index = 0; @@ -67,22 +70,10 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { if (pCache == NULL) return NULL; if (bytes > pCache->cacheBlockSize) return NULL; - if (pCache->mem == NULL) { // Create a new one - pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); - if (pCache->mem == NULL) return NULL; - pCache->mem->keyFirst = INT64_MAX; - pCache->mem->keyLast = 0; - pCache->mem->numOfPoints = 0; - pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); - } - - if (isListEmpty(pCache->mem->list)) { - if (tsdbAllocBlockFromPool(pCache) < 0) { - // TODO: deal with the error + if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) { + if (pCache->curBlock !=NULL && (pCache->mem->list) >= pCache->totalCacheBlocks/2) { + tsdbTriggerCommit(pCache->pRepo); } - } - - if (pCache->curBlock->remain < bytes) { if (tsdbAllocBlockFromPool(pCache) < 0) { // TODO: deal with the error } @@ -115,7 +106,12 @@ static void tsdbFreeBlockList(SCacheMem *mem) { static int tsdbAllocBlockFromPool(STsdbCache *pCache) { STsdbCachePool *pPool = &(pCache->pool); - if (listNEles(pPool->memPool) == 0) return -1; + + tsdbLockRepo(pCache->pRepo); + if (listNEles(pPool->memPool) == 0) { + tsdbUnLockRepo(pCache->pRepo); + return -1; + } SListNode *node = tdListPopHead(pPool->memPool); @@ -125,8 +121,19 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { pBlock->offset = 0; pBlock->remain = pCache->cacheBlockSize; + if (pCache->mem == NULL) { // Create a new one + pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); + if (pCache->mem == NULL) return NULL; + pCache->mem->keyFirst = INT64_MAX; + pCache->mem->keyLast = 0; + pCache->mem->numOfPoints = 0; + pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); + } + tdListAppendNode(pCache->mem->list, node); pCache->curBlock = pBlock; + tsdbUnLockRepo(pCache->pRepo); + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index cd673d2d89a64b6ffb653efca9fbbe1ea0415f58..a8a80dd1641ac0939187ec9abd24fb24ddef0246 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -150,6 +150,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->rootDir = strdup(rootDir); pRepo->config = *pCfg; pRepo->limiter = limiter; + pthread_mutex_init(&pRepo->mutex, NULL); // Create the environment files and directories if (tsdbSetRepoEnv(pRepo) < 0) { @@ -168,7 +169,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->tsdbMeta = pMeta; // Initialize cache - STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1); + STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo); if (pCache == NULL) { free(pRepo->rootDir); tsdbFreeMeta(pRepo->tsdbMeta); @@ -249,7 +250,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } - pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1); + pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1, (tsdb_repo_t *)pRepo); if (pRepo->tsdbCache == NULL) { tsdbFreeMeta(pRepo->tsdbMeta); free(pRepo->rootDir); @@ -305,9 +306,12 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) { int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; - - if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; - if (pRepo->commit) return 0; + + tsdbLockRepo(repo); + if (pRepo->commit) { + tsdbUnLockRepo(repo); + return -1; + } pRepo->commit = 1; // Loop to move pData to iData for (int i = 0; i < pRepo->config.maxTables; i++) { @@ -320,15 +324,25 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { // TODO: Loop to move mem to imem pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->mem = NULL; + pRepo->tsdbCache->curBlock = NULL; + // TODO: here should set as detached or use join for memory leak pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); - pthread_mutex_unlock(&(pRepo->mutex)); - - pthread_join(pRepo->commitThread, NULL); + tsdbUnLockRepo(repo); return 0; } +int32_t tsdbLockRepo(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + return pthread_mutex_lock(repo); +} + +int32_t tsdbUnLockRepo(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + return pthread_mutex_unlock(repo); +} + /** * Get the TSDB repository information, including some statistics * @param pRepo the TSDB repository handle @@ -691,6 +705,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable dataRowCpy(SL_GET_NODE_DATA(pNode), row); // Insert the skiplist node into the data + if (pTable->mem == NULL) { + pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); + if (pTable->mem == NULL) return -1; + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->keyFirst = INT64_MAX; + pTable->mem->keyLast = 0; + } tSkipListPut(pTable->mem->pData, pNode); if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; @@ -718,7 +739,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { return 0; } -static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, void *dst) { +static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCol **cols, STSchema *pSchema) { int numOfRows = 0; do { SSkipListNode *node = tSkipListIterGet(pIter); @@ -727,6 +748,11 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max SDataRow row = SL_GET_NODE_DATA(node); if (dataRowKey(row) > maxKey) break; // Convert row data to column data + // for (int i = 0; i < schemaNCols(pSchema); i++) { + // STColumn *pCol = schemaColAt(pSchema, i); + // memcpy(cols[i]->data + TYPE_BYTES[colType(pCol)] * numOfRows, dataRowAt(row, pCol->offset), + // TYPE_BYTES[colType(pCol)]); + // } numOfRows++; if (numOfRows > maxRowsToRead) break; @@ -754,6 +780,8 @@ static void *tsdbCommitToFile(void *arg) { int maxCols = pMeta->maxCols; int maxBytes = pMeta->maxRowBytes; + SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); + void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); for (int fid = sfid; fid <= efid; fid++) { TSKEY minKey = 0, maxKey = 0; @@ -771,9 +799,17 @@ static void *tsdbCommitToFile(void *arg) { } } + // Init row data part + cols[0] = (SDataCol *)buf; + for (int col = 1; col < schemaNCols(pTable->schema); col++) { + cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); + } + // Loop the iterator int rowsRead = 0; - while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, NULL)) > 0) { + while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > + 0) { + // printf("rowsRead:%d-----------\n", rowsRead); int k = 0; } } @@ -784,7 +820,23 @@ static void *tsdbCommitToFile(void *arg) { if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]); } + free(buf); + free(cols); free(iters); + tsdbLockRepo(arg); + tdListMove(pCache->imem->list, pCache->pool.memPool); + free(pCache->imem); + pCache->imem = NULL; + pRepo->commit = 0; + // TODO: free the skiplist + for (int i = 0; i < pCfg->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->imem) { // Here has memory leak + pTable->imem = NULL; + } + } + tsdbUnLockRepo(arg); + return NULL; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index b8b5450d232eb5453ed7890a3a101bb4be19492d..72c1667fe1862b49c1d351f33e78e59ba597afef 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -236,6 +236,10 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { table->type = TSDB_NORMAL_TABLE; table->superUid = -1; table->schema = tdDupSchema(pCfg->schema); + if (schemaNCols(table->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(table->schema); + tdUpdateSchema(table->schema); + int bytes = tdMaxRowBytesFromSchema(table->schema); + if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes; } // Register to meta diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index de58d6337c376f87d89603f5d7a3f9d23390bf30..459d531c10732c3ff905627454deea458149a386 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -1,12 +1,19 @@ #include #include +#include #include "tsdb.h" #include "dataformat.h" #include "tsdbFile.h" #include "tsdbMeta.h" -TEST(TsdbTest, tableEncodeDecode) { +double getCurTime() { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec + tv.tv_usec * 1E-6; +} + +TEST(TsdbTest, DISABLED_tableEncodeDecode) { STable *pTable = (STable *)malloc(sizeof(STable)); pTable->type = TSDB_NORMAL_TABLE; @@ -71,19 +78,22 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 1000; - int rowsPerSubmit = 10; + int nRows = 10000000; + int rowsPerSubmit = 100; int64_t start_time = 1584081000000; SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); + double stime = getCurTime(); + for (int k = 0; k < nRows/rowsPerSubmit; k++) { SSubmitBlk *pBlock = pMsg->blocks; pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; pBlock->sversion = 0; pBlock->len = 0; for (int i = 0; i < rowsPerSubmit; i++) { - start_time += 1000; + // start_time += 1000; + start_time -= 1000; SDataRow row = (SDataRow)(pBlock->data + pBlock->len); tdInitDataRow(row, schema); @@ -102,16 +112,22 @@ TEST(TsdbTest, createRepo) { tsdbInsertData(pRepo, pMsg); } - tsdbTriggerCommit(pRepo); + double etime = getCurTime(); + + printf("Spent %f seconds to write %d records\n", etime - stime, nRows); + + + + // tsdbTriggerCommit(pRepo); } -TEST(TsdbTest, openRepo) { +TEST(TsdbTest, DISABLED_openRepo) { tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); ASSERT_NE(pRepo, nullptr); } -TEST(TsdbTest, createFileGroup) { +TEST(TsdbTest, DISABLED_createFileGroup) { SFileGroup fGroup; ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);