diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 9c356b0cbc71671ee8a7d917bf18c0b988f0cb1f..04826e43ac98852495b5951aaa0ba66315bf2480 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -150,7 +150,7 @@ void tdFreeSchema(STSchema *pSchema) { */ void tdUpdateSchema(STSchema *pSchema) { STColumn *pCol = NULL; - int32_t offset = 0; + int32_t offset = TD_DATA_ROW_HEAD_SIZE; for (int i = 0; i < schemaNCols(pSchema); i++) { pCol = schemaColAt(pSchema, i); colSetOffset(pCol, offset); diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index b88b13e288d8ab95669eea25c9601ee1baed034f..92d8ad757b864062f459f92f7fcb4aa7fedebb08 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -59,6 +59,8 @@ tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbTriggerCommit(tsdb_repo_t *repo); +int32_t tsdbLockRepo(tsdb_repo_t *repo); +int32_t tsdbUnLockRepo(tsdb_repo_t *repo); // --------- TSDB TABLE DEFINITION typedef struct { diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 2676829c755df47f6586d617818e610b424ef2b5..3e9eabc90d0eb2b177e7da19ef6af892f064c23d 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -19,6 +19,7 @@ #include "taosdef.h" #include "tlist.h" +#include "tsdb.h" #ifdef __cplusplus extern "C" { @@ -49,13 +50,15 @@ typedef struct { typedef struct { int maxBytes; int cacheBlockSize; + int totalCacheBlocks; STsdbCachePool pool; STsdbCacheBlock *curBlock; SCacheMem * mem; SCacheMem * imem; + tsdb_repo_t * pRepo; } STsdbCache; -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo); void tsdbFreeCache(STsdbCache *pCache); void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key); diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 3b1d44e6c7055e78f83a33d0808d6fc2ae0599b6..3496f6a5c8c480bde799adeb04ca4e6668057820 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -14,12 +14,14 @@ */ #include +#include "tsdb.h" #include "tsdbCache.h" -static int tsdbAllocBlockFromPool(STsdbCache *pCache); -static void tsdbFreeBlockList(SCacheMem *mem); +static int tsdbAllocBlockFromPool(STsdbCache *pCache); +static void tsdbFreeBlockList(SList *list); +static void tsdbFreeCacheMem(SCacheMem *mem); -STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); if (pCache == NULL) return NULL; @@ -27,9 +29,11 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { pCache->maxBytes = maxBytes; pCache->cacheBlockSize = cacheBlockSize; + pCache->pRepo = pRepo; int nBlocks = maxBytes / cacheBlockSize + 1; if (nBlocks <= 1) nBlocks = 2; + pCache->totalCacheBlocks = nBlocks; STsdbCachePool *pPool = &(pCache->pool); pPool->index = 0; @@ -57,8 +61,8 @@ _err: } void tsdbFreeCache(STsdbCache *pCache) { - tsdbFreeBlockList(pCache->imem); - tsdbFreeBlockList(pCache->mem); + tsdbFreeCacheMem(pCache->imem); + tsdbFreeCacheMem(pCache->mem); tsdbFreeBlockList(pCache->pool.memPool); free(pCache); } @@ -67,22 +71,10 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { if (pCache == NULL) return NULL; if (bytes > pCache->cacheBlockSize) return NULL; - if (pCache->mem == NULL) { // Create a new one - pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); - if (pCache->mem == NULL) return NULL; - pCache->mem->keyFirst = INT64_MAX; - pCache->mem->keyLast = 0; - pCache->mem->numOfPoints = 0; - pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); - } - - if (isListEmpty(pCache->mem->list)) { - if (tsdbAllocBlockFromPool(pCache) < 0) { - // TODO: deal with the error + if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) { + if (pCache->curBlock !=NULL && (pCache->mem->list) >= pCache->totalCacheBlocks/2) { + tsdbTriggerCommit(pCache->pRepo); } - } - - if (pCache->curBlock->remain < bytes) { if (tsdbAllocBlockFromPool(pCache) < 0) { // TODO: deal with the error } @@ -99,9 +91,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { return ptr; } -static void tsdbFreeBlockList(SCacheMem *mem) { - if (mem == NULL) return; - SList * list = mem->list; +static void tsdbFreeBlockList(SList *list) { SListNode * node = NULL; STsdbCacheBlock *pBlock = NULL; while ((node = tdListPopHead(list)) != NULL) { @@ -110,12 +100,23 @@ static void tsdbFreeBlockList(SCacheMem *mem) { listNodeFree(node); } tdListFree(list); +} + +static void tsdbFreeCacheMem(SCacheMem *mem) { + if (mem == NULL) return; + SList *list = mem->list; + tsdbFreeBlockList(list); free(mem); } static int tsdbAllocBlockFromPool(STsdbCache *pCache) { STsdbCachePool *pPool = &(pCache->pool); - if (listNEles(pPool->memPool) == 0) return -1; + + tsdbLockRepo(pCache->pRepo); + if (listNEles(pPool->memPool) == 0) { + tsdbUnLockRepo(pCache->pRepo); + return -1; + } SListNode *node = tdListPopHead(pPool->memPool); @@ -125,8 +126,19 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { pBlock->offset = 0; pBlock->remain = pCache->cacheBlockSize; + if (pCache->mem == NULL) { // Create a new one + pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); + if (pCache->mem == NULL) return NULL; + pCache->mem->keyFirst = INT64_MAX; + pCache->mem->keyLast = 0; + pCache->mem->numOfPoints = 0; + pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); + } + tdListAppendNode(pCache->mem->list, node); pCache->curBlock = pBlock; + tsdbUnLockRepo(pCache->pRepo); + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 6c8e9986302fd2cafb0827a3aa5e3c162d0aba58..2df7974844943eaa90947f45baa66724d2240cb5 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -150,6 +150,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->rootDir = strdup(rootDir); pRepo->config = *pCfg; pRepo->limiter = limiter; + pthread_mutex_init(&pRepo->mutex, NULL); // Create the environment files and directories if (tsdbSetRepoEnv(pRepo) < 0) { @@ -168,7 +169,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->tsdbMeta = pMeta; // Initialize cache - STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1); + STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo); if (pCache == NULL) { free(pRepo->rootDir); tsdbFreeMeta(pRepo->tsdbMeta); @@ -249,7 +250,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } - pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1); + pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1, (tsdb_repo_t *)pRepo); if (pRepo->tsdbCache == NULL) { tsdbFreeMeta(pRepo->tsdbMeta); free(pRepo->rootDir); @@ -305,9 +306,12 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) { int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; - - if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; - if (pRepo->commit) return 0; + + tsdbLockRepo(repo); + if (pRepo->commit) { + tsdbUnLockRepo(repo); + return -1; + } pRepo->commit = 1; // Loop to move pData to iData for (int i = 0; i < pRepo->config.maxTables; i++) { @@ -320,15 +324,25 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { // TODO: Loop to move mem to imem pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->mem = NULL; + pRepo->tsdbCache->curBlock = NULL; + // TODO: here should set as detached or use join for memory leak pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); - pthread_mutex_unlock(&(pRepo->mutex)); - - pthread_join(pRepo->commitThread, NULL); + tsdbUnLockRepo(repo); return 0; } +int32_t tsdbLockRepo(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + return pthread_mutex_lock(repo); +} + +int32_t tsdbUnLockRepo(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + return pthread_mutex_unlock(repo); +} + /** * Get the TSDB repository information, including some statistics * @param pRepo the TSDB repository handle @@ -698,7 +712,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize); TSKEY key = dataRowKey(row); - printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints); + // printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints); // Copy row into the memory SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key); @@ -710,6 +724,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable dataRowCpy(SL_GET_NODE_DATA(pNode), row); // Insert the skiplist node into the data + if (pTable->mem == NULL) { + pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); + if (pTable->mem == NULL) return -1; + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->keyFirst = INT64_MAX; + pTable->mem->keyLast = 0; + } tSkipListPut(pTable->mem->pData, pNode); if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; @@ -740,7 +761,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { return 0; } -static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, void *dst) { +static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCol **cols, STSchema *pSchema) { int numOfRows = 0; do { SSkipListNode *node = tSkipListIterGet(pIter); @@ -749,6 +770,11 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max SDataRow row = SL_GET_NODE_DATA(node); if (dataRowKey(row) > maxKey) break; // Convert row data to column data + // for (int i = 0; i < schemaNCols(pSchema); i++) { + // STColumn *pCol = schemaColAt(pSchema, i); + // memcpy(cols[i]->data + TYPE_BYTES[colType(pCol)] * numOfRows, dataRowAt(row, pCol->offset), + // TYPE_BYTES[colType(pCol)]); + // } numOfRows++; if (numOfRows > maxRowsToRead) break; @@ -776,6 +802,8 @@ static void *tsdbCommitToFile(void *arg) { int maxCols = pMeta->maxCols; int maxBytes = pMeta->maxRowBytes; + SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); + void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); for (int fid = sfid; fid <= efid; fid++) { TSKEY minKey = 0, maxKey = 0; @@ -793,9 +821,17 @@ static void *tsdbCommitToFile(void *arg) { } } + // Init row data part + cols[0] = (SDataCol *)buf; + for (int col = 1; col < schemaNCols(pTable->schema); col++) { + cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); + } + // Loop the iterator int rowsRead = 0; - while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, NULL)) > 0) { + while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > + 0) { + // printf("rowsRead:%d-----------\n", rowsRead); int k = 0; } } @@ -806,7 +842,23 @@ static void *tsdbCommitToFile(void *arg) { if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]); } + free(buf); + free(cols); free(iters); + tsdbLockRepo(arg); + tdListMove(pCache->imem->list, pCache->pool.memPool); + free(pCache->imem); + pCache->imem = NULL; + pRepo->commit = 0; + // TODO: free the skiplist + for (int i = 0; i < pCfg->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->imem) { // Here has memory leak + pTable->imem = NULL; + } + } + tsdbUnLockRepo(arg); + return NULL; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index b8b5450d232eb5453ed7890a3a101bb4be19492d..72c1667fe1862b49c1d351f33e78e59ba597afef 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -236,6 +236,10 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { table->type = TSDB_NORMAL_TABLE; table->superUid = -1; table->schema = tdDupSchema(pCfg->schema); + if (schemaNCols(table->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(table->schema); + tdUpdateSchema(table->schema); + int bytes = tdMaxRowBytesFromSchema(table->schema); + if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes; } // Register to meta diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index de58d6337c376f87d89603f5d7a3f9d23390bf30..a76aef2d41d154ca64ac48bfb3a5075309fe3f7f 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -1,12 +1,19 @@ #include #include +#include #include "tsdb.h" #include "dataformat.h" #include "tsdbFile.h" #include "tsdbMeta.h" -TEST(TsdbTest, tableEncodeDecode) { +double getCurTime() { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec + tv.tv_usec * 1E-6; +} + +TEST(TsdbTest, DISABLED_tableEncodeDecode) { STable *pTable = (STable *)malloc(sizeof(STable)); pTable->type = TSDB_NORMAL_TABLE; @@ -71,19 +78,23 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 1000; + int nRows = 10000000; int rowsPerSubmit = 10; int64_t start_time = 1584081000000; SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); + double stime = getCurTime(); + for (int k = 0; k < nRows/rowsPerSubmit; k++) { SSubmitBlk *pBlock = pMsg->blocks; - pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; + pBlock->uid = 987607499877672L; + pBlock->tid = 0; pBlock->sversion = 0; pBlock->len = 0; for (int i = 0; i < rowsPerSubmit; i++) { - start_time += 1000; + // start_time += 1000; + start_time -= 1000; SDataRow row = (SDataRow)(pBlock->data + pBlock->len); tdInitDataRow(row, schema); @@ -97,21 +108,38 @@ TEST(TsdbTest, createRepo) { } pBlock->len += dataRowLen(row); } + pBlock->len = htonl(pBlock->len); + pBlock->numOfRows = htonl(pBlock->numOfRows); + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + + pBlock->sversion = htonl(pBlock->sversion); + pBlock->padding = htonl(pBlock->padding); + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + pMsg->compressed = htonl(pMsg->numOfBlocks); tsdbInsertData(pRepo, pMsg); } - tsdbTriggerCommit(pRepo); + double etime = getCurTime(); + + printf("Spent %f seconds to write %d records\n", etime - stime, nRows); + + + + // tsdbTriggerCommit(pRepo); } -TEST(TsdbTest, openRepo) { +TEST(TsdbTest, DISABLED_openRepo) { tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); ASSERT_NE(pRepo, nullptr); } -TEST(TsdbTest, createFileGroup) { +TEST(TsdbTest, DISABLED_createFileGroup) { SFileGroup fGroup; ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);