diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 13dbf7eb8cab5f480ff77b6aafe524ef9d8e98f8..a4241cdf7d2d2d7a71414b9a657e2ef5f3d5a947 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -867,7 +867,7 @@ static void *tsdbCommitData(void *arg) { SRWHelper whelper = {0}; if (pCache->imem == NULL) return NULL; - pRepo->appH.walCallBack(pRepo->appH.appH); + if (pRepo->appH.walCallBack) pRepo->appH.walCallBack(pRepo->appH.appH); // Create the iterator to read from cache SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 1b5b0e765691a95040925f142f7b8b0cd2bc0c7d..5aeed0c50821a7a46476e4f27c453bcbe868f525 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -29,7 +29,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); -static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey); +static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey); static void tsdbResetHelperBlock(SRWHelper *pHelper); // ---------- Operations on Helper File part @@ -342,7 +342,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1); ASSERT(rowsToWrite > 0); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err; - if (tsdbInsertSuperBlock(pHelper, pCompBlock, blkIdx) < 0) goto _err; + if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } } } @@ -934,7 +934,15 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa return -1; } -static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); } +static int compTSKEY(const void *key1, const void *key2) { + if (*(TSKEY *)key1 > *(TSKEY *)key2) { + return 1; + } else if (*(TSKEY *)key1 == *(TSKEY *)key2) { + return 0; + } else { + return -1; + } +} static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { @@ -979,6 +987,10 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; + if (pIdx->numOfSuperBlocks > 1) { + ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); + } + return 0; _err: @@ -1104,7 +1116,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int } // Get the number of rows in range [minKey, maxKey] -static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) { +static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) { if (pDataCols->numOfPoints == 0) return 0; ASSERT(minKey <= maxKey); diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index 1fbfa4853b6c60a25fd17582af7e9db3dc74436b..58a7a0ae0867f1eb3716d9da67d2ff493ed6059c 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -13,14 +13,15 @@ static double getCurTime() { typedef struct { TsdbRepoT *pRepo; - int tid; - int64_t uid; - int sversion; - TSKEY startTime; - TSKEY interval; - int totalRows; - int rowsPerSubmit; - STSchema * pSchema; + bool isAscend; + int tid; + int64_t uid; + int sversion; + TSKEY startTime; + TSKEY interval; + int totalRows; + int rowsPerSubmit; + STSchema * pSchema; } SInsertInfo; static int insertData(SInsertInfo *pInfo) { @@ -41,7 +42,11 @@ static int insertData(SInsertInfo *pInfo) { pBlock->len = 0; for (int i = 0; i < pInfo->rowsPerSubmit; i++) { // start_time += 1000; - start_time += pInfo->interval; + if (pInfo->isAscend) { + start_time += pInfo->interval; + } else { + start_time -= pInfo->interval; + } SDataRow row = (SDataRow)(pBlock->data + pBlock->len); tdInitDataRow(row, pInfo->pSchema); @@ -155,12 +160,14 @@ TEST(TsdbTest, createRepo) { // Insert Some Data SInsertInfo iInfo = { .pRepo = pRepo, + // .isAscend = true, + .isAscend = false, .tid = tCfg.tableId.tid, .uid = tCfg.tableId.uid, .sversion = tCfg.sversion, .startTime = 1584081000000, .interval = 1000, - .totalRows = 50, + .totalRows = 5000000, .rowsPerSubmit = 1, .pSchema = schema }; @@ -175,34 +182,34 @@ TEST(TsdbTest, createRepo) { repo = (STsdbRepo *)pRepo; ASSERT_NE(pRepo, nullptr); - // Insert more data - iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows; - iInfo.totalRows = 10; - iInfo.pRepo = pRepo; - ASSERT_EQ(insertData(&iInfo), 0); + // // Insert more data + // iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows; + // iInfo.totalRows = 10; + // iInfo.pRepo = pRepo; + // ASSERT_EQ(insertData(&iInfo), 0); - // Close the repository - tsdbCloseRepo(pRepo); + // // Close the repository + // tsdbCloseRepo(pRepo); - // Open the repository again - pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); - repo = (STsdbRepo *)pRepo; - ASSERT_NE(pRepo, nullptr); + // // Open the repository again + // pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + // repo = (STsdbRepo *)pRepo; + // ASSERT_NE(pRepo, nullptr); - // Read from file - SRWHelper rhelper; - tsdbInitReadHelper(&rhelper, repo); + // // Read from file + // SRWHelper rhelper; + // tsdbInitReadHelper(&rhelper, repo); - SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833); - ASSERT_NE(pFGroup, nullptr); - ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0); + // SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833); + // ASSERT_NE(pFGroup, nullptr); + // ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0); - STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid); - ASSERT_NE(pTable, nullptr); - tsdbSetHelperTable(&rhelper, pTable, repo); + // STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid); + // ASSERT_NE(pTable, nullptr); + // tsdbSetHelperTable(&rhelper, pTable, repo); - ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0); - ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0); + // ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0); + // ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0); int k = 0; }