diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index aaedc7672677dcd2c27d961412955c0ad519d326..bdb12a59f9776eaf2924c6c08c35c1b204e2065a 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -100,6 +100,8 @@ SListNode *tdListPopHead(SList *list) { list->head = node->next; } list->numOfEles--; + node->next = NULL; + node->prev = NULL; return node; } @@ -113,6 +115,7 @@ SListNode *tdListPopTail(SList *list) { list->tail = node->prev; } list->numOfEles--; + node->next = node->prev = NULL; return node; } @@ -131,6 +134,7 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { node->next->prev = node->prev; } list->numOfEles--; + node->next = node->prev = NULL; return node; } diff --git a/src/util/tests/stringTest.cpp b/src/util/tests/stringTest.cpp index b1b06c7f490fbe8dd5adcb0d442f3e1034833bc1..ef8df90e3eac8c4381cbc23cd5311b794b73d058 100644 --- a/src/util/tests/stringTest.cpp +++ b/src/util/tests/stringTest.cpp @@ -157,22 +157,22 @@ TEST(testCase, string_strnchr_test) { EXPECT_TRUE(strnchr(a10, '.', strlen(a10), true) == NULL); } -TEST(testCase, cache_resize_test) { - char a11[] = "abc'.'"; - EXPECT_TRUE(strnchr(a11, '.', strlen(a11), false) != NULL); +// TEST(testCase, cache_resize_test) { +// char a11[] = "abc'.'"; +// EXPECT_TRUE(strnchr(a11, '.', strlen(a11), false) != NULL); - char a12[] = "abc'-'"; - EXPECT_TRUE(strnchr(a12, '-', strlen(a12), false) != NULL); +// char a12[] = "abc'-'"; +// EXPECT_TRUE(strnchr(a12, '-', strlen(a12), false) != NULL); - char a15[] = "abc'-'"; - EXPECT_TRUE(strnchr(a15, '-', strlen(a15), true) == NULL); +// char a15[] = "abc'-'"; +// EXPECT_TRUE(strnchr(a15, '-', strlen(a15), true) == NULL); - char a13[] = "'-'"; - EXPECT_TRUE(strnchr(a13, '-', strlen(a13), false) != NULL); +// char a13[] = "'-'"; +// EXPECT_TRUE(strnchr(a13, '-', strlen(a13), false) != NULL); - char a14[] = "'-'"; - EXPECT_TRUE(strnchr(a14, '-', strlen(a14), true) == NULL); +// char a14[] = "'-'"; +// EXPECT_TRUE(strnchr(a14, '-', strlen(a14), true) == NULL); - char a16[] = "'-'."; - EXPECT_TRUE(strnchr(a16, '.', strlen(a16), true) != NULL); -} \ No newline at end of file +// char a16[] = "'-'."; +// EXPECT_TRUE(strnchr(a16, '.', strlen(a16), true) != NULL); +// } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 769fc238153987b4cf14e09ca2692d260fdf9b3a..c45a8407cc1216f17bd74a57ce3e80f31ccf5464 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -345,12 +345,12 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { int32_t tsdbLockRepo(tsdb_repo_t *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; - return pthread_mutex_lock(repo); + return pthread_mutex_lock(&(pRepo->mutex)); } int32_t tsdbUnLockRepo(tsdb_repo_t *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; - return pthread_mutex_unlock(repo); + return pthread_mutex_unlock(&(pRepo->mutex)); } /** @@ -900,6 +900,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); if (!hasDataToCommit) return 0; // No data to commit, just return + // TODO: make it more flexible + pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000); + // Create and open files for commit tsdbGetDataDirName(pRepo, dataDir); if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ @@ -908,7 +911,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (pGroup == NULL) { /* TODO */ } tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); - if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { // TODO: make it not to write the last file every time tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); isNewLastFile = 1; @@ -929,6 +932,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters SSkipListIterator *pIter = iters[tid]; SCompIdx * pIdx = &pIndices[tid]; + int nNewBlocks = 0; + if (pTable == NULL || pIter == NULL) continue; /* If no new data to write for this table, just write the old data to new file @@ -936,8 +941,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters */ if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { // has old data - if (pIdx->offset > 0) { - if (isNewLastFile && pIdx->hasLast) { + if (pIdx->len > 0) { + goto _table_over; + // if (isNewLastFile && pIdx->hasLast) { + if (0) { // need to move the last block to new file if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ } @@ -976,9 +983,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters continue; } + pCompInfo->delimiter = TSDB_FILE_DELIMITER; + pCompInfo->checksum = 0; + pCompInfo->uid = pTable->tableId.uid; + // Load SCompBlock part if neccessary int isCompBlockLoaded = 0; - if (pIdx->offset > 0) { + if (0) { + // if (pIdx->offset > 0) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { // has last block || cache key overlap with commit key pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); @@ -998,34 +1010,50 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); if (pCols->numOfPoints == 0) break; - int pointsWritten = 0; - // { // TODO : try to write the block data to file - // if (!isCompBlockLoaded) { // Just append - // if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file - // lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END); - - // } else { - // if (isNewLastFile) { // write directly to .l file - - // } else { // write directly to .last file + int pointsWritten = pCols->numOfPoints; + // TODO: all write to the end of .data file + int64_t toffset = 0; + int32_t tlen = 0; + tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid); + + // Make the compBlock + SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++; + pTBlock->offset = toffset; + pTBlock->len = tlen; + pTBlock->keyFirst = dataColsKeyFirst(pCols); + pTBlock->keyLast = dataColsKeyLast(pCols); + pTBlock->last = 0; + pTBlock->algorithm = 0; + pTBlock->numOfPoints = pCols->numOfPoints; + pTBlock->sversion = pTable->sversion; + pTBlock->numOfSubBlocks = 1; + + if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols); - // } - // } - // } else { // Need to append - // // SCompBlock *pTBlock = NULL; - // } - // } - // pointsWritten = pCols->numOfPoints; tdPopDataColsPoints(pCols, pointsWritten); maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; } + +_table_over: // Write the SCompBlock part - if (isCompBlockLoaded) { - // merge the block into old and update pIdx + pIdx->offset = lseek(hFile.fd, 0, SEEK_END); + if (pIdx->len > 0) { + sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); + if (nNewBlocks > 0) { + write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks); + pIdx->len += (sizeof(SCompBlock) * nNewBlocks); + } } else { - // sendfile the SCompBlock part and update the pIdx + if (nNewBlocks > 0) { + write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks); + pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks; + } } + + pIdx->checksum = 0; + pIdx->numOfSuperBlocks += nNewBlocks; + pIdx->hasLast = 0; } // Write the SCompIdx part