diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 62a43333aed5e9feb6bfa49944e6c607e70939a6..6873c69c2a7377c09a2cbffa3269cdf96212452d 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -763,6 +763,8 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max int numOfRows = 0; do { + if (numOfRows >= maxRowsToRead) break; + SSkipListNode *node = tSkipListIterGet(pIter); if (node == NULL) break; @@ -770,9 +772,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max if (dataRowKey(row) > maxKey) break; tdAppendDataRowToDataCol(row, pCols); - numOfRows++; - if (numOfRows >= maxRowsToRead) break; } while (tSkipListIterNext(pIter)); return numOfRows; diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 9ee49d6a708a65b3344fc5fd56f458b0a9dd23cc..eac29af097da67706cbcfff222a6e18c75d2e41c 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -48,104 +48,104 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); } -// TEST(TsdbTest, DISABLED_createRepo) { -TEST(TsdbTest, createRepo) { - STsdbCfg config; - - // 1. Create a tsdb repository - tsdbSetDefaultCfg(&config); - tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL); - ASSERT_NE(pRepo, nullptr); - - // 2. Create a normal table - STableCfg tCfg; - ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); - ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0); - - int nCols = 5; - STSchema *schema = tdNewSchema(nCols); - - for (int i = 0; i < nCols; i++) { - if (i == 0) { - tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); - } else { - tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); - } - } - - tsdbTableSetSchema(&tCfg, schema, true); - - tsdbCreateTable(pRepo, &tCfg); - - // // 3. Loop to write some simple data - int nRows = 1; - int rowsPerSubmit = 1; - int64_t start_time = 1584081000000; - - SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); - - double stime = getCurTime(); - - for (int k = 0; k < nRows/rowsPerSubmit; k++) { - memset((void *)pMsg, 0, sizeof(SSubmitMsg)); - SSubmitBlk *pBlock = pMsg->blocks; - pBlock->uid = 987607499877672L; - pBlock->tid = 0; - pBlock->sversion = 0; - pBlock->len = 0; - for (int i = 0; i < rowsPerSubmit; i++) { - // start_time += 1000; - start_time += 1000; - SDataRow row = (SDataRow)(pBlock->data + pBlock->len); - tdInitDataRow(row, schema); - - for (int j = 0; j < schemaNCols(schema); j++) { - if (j == 0) { // Just for timestamp - tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); - } else { // For int - int val = 10; - tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); - } - } - pBlock->len += dataRowLen(row); - } - pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; - pMsg->numOfBlocks = 1; - - pBlock->len = htonl(pBlock->len); - pBlock->numOfRows = htonl(pBlock->numOfRows); - pBlock->uid = htobe64(pBlock->uid); - pBlock->tid = htonl(pBlock->tid); - - pBlock->sversion = htonl(pBlock->sversion); - pBlock->padding = htonl(pBlock->padding); - - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - pMsg->compressed = htonl(pMsg->numOfBlocks); - - tsdbInsertData(pRepo, pMsg); - } - - double etime = getCurTime(); - - void *ptr = malloc(150000); - free(ptr); - - printf("Spent %f seconds to write %d records\n", etime - stime, nRows); - - tsdbCloseRepo(pRepo); +TEST(TsdbTest, DISABLED_createRepo) { +// TEST(TsdbTest, createRepo) { + // STsdbCfg config; + + // // 1. Create a tsdb repository + // tsdbSetDefaultCfg(&config); + // tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL); + // ASSERT_NE(pRepo, nullptr); + + // // 2. Create a normal table + // STableCfg tCfg; + // ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); + // ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0); + + // int nCols = 5; + // STSchema *schema = tdNewSchema(nCols); + + // for (int i = 0; i < nCols; i++) { + // if (i == 0) { + // tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); + // } else { + // tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); + // } + // } + + // tsdbTableSetSchema(&tCfg, schema, true); + + // tsdbCreateTable(pRepo, &tCfg); + + // // // 3. Loop to write some simple data + // int nRows = 1; + // int rowsPerSubmit = 1; + // int64_t start_time = 1584081000000; + + // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); + + // double stime = getCurTime(); + + // for (int k = 0; k < nRows/rowsPerSubmit; k++) { + // memset((void *)pMsg, 0, sizeof(SSubmitMsg)); + // SSubmitBlk *pBlock = pMsg->blocks; + // pBlock->uid = 987607499877672L; + // pBlock->tid = 0; + // pBlock->sversion = 0; + // pBlock->len = 0; + // for (int i = 0; i < rowsPerSubmit; i++) { + // // start_time += 1000; + // start_time += 1000; + // SDataRow row = (SDataRow)(pBlock->data + pBlock->len); + // tdInitDataRow(row, schema); + + // for (int j = 0; j < schemaNCols(schema); j++) { + // if (j == 0) { // Just for timestamp + // tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); + // } else { // For int + // int val = 10; + // tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); + // } + // } + // pBlock->len += dataRowLen(row); + // } + // pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + // pMsg->numOfBlocks = 1; + + // pBlock->len = htonl(pBlock->len); + // pBlock->numOfRows = htonl(pBlock->numOfRows); + // pBlock->uid = htobe64(pBlock->uid); + // pBlock->tid = htonl(pBlock->tid); + + // pBlock->sversion = htonl(pBlock->sversion); + // pBlock->padding = htonl(pBlock->padding); + + // pMsg->length = htonl(pMsg->length); + // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + // pMsg->compressed = htonl(pMsg->numOfBlocks); + + // tsdbInsertData(pRepo, pMsg); + // } + + // double etime = getCurTime(); + + // void *ptr = malloc(150000); + // free(ptr); + + // printf("Spent %f seconds to write %d records\n", etime - stime, nRows); + + // tsdbCloseRepo(pRepo); } // TEST(TsdbTest, DISABLED_openRepo) { TEST(TsdbTest, openRepo) { - tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL); ASSERT_NE(repo, nullptr); STsdbRepo *pRepo = (STsdbRepo *)repo; - SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1833); + SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655); for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { tsdbOpenFile(&pGroup->files[type], O_RDONLY); @@ -156,7 +156,7 @@ TEST(TsdbTest, openRepo) { SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len); - tsdbLoadCompBlocks(pGroup, &pIdx[0], (void *)pCompInfo); + tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo); int blockIdx = 0; SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]); @@ -166,11 +166,15 @@ TEST(TsdbTest, openRepo) { tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); - SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(pTable->schema), 5, 10); - tdInitDataCols(pDataCols, pTable->schema); + SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10); + tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable)); tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); + tdResetDataCols(pDataCols); + + tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData); + int k = 0;