diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 3db3f1d98a9aebc00395ec1afe1b55ad3cc3af96..b5451ae691a45d3b66dbb931912e93742365d737 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -353,8 +353,9 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].bytes = pDataCols->cols[i].bytes; pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].offset = pDataCols->cols[i].offset; + pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].len); + if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].bytes * pDataCols->numOfPoints); } return pRet; @@ -410,36 +411,47 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { SDataCols *pTarget = tdDupDataCols(target, true); if (pTarget == NULL) goto _err; + tdResetDataCols(target); int iter1 = 0; int iter2 = 0; while (true) { - if (iter1 >= pTarget->numOfPoints) { - // TODO: merge the source part - int rowsLeft = source->numOfPoints - iter2; - if (rowsLeft > 0) { - for (int i = 0; i < source->numOfCols; i++) { - ASSERT(target->cols[i].type == source->cols[i].type); - - memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), - (void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2), - TYPE_BYTES[target->cols[i].type] * rowsLeft); - } + if (iter1 >= pTarget->numOfPoints && iter2 >= source->numOfPoints) break; + + TSKEY key1 = (iter1 >= pTarget->numOfPoints) ? INT64_MAX : ((TSKEY *)(pTarget->cols[0].pData))[iter1]; + TSKEY key2 = (iter2 >= rowsToMerge) ? INT64_MAX : ((TSKEY *)(source->cols[0].pData))[iter2]; + + if (key1 < key2) { // Copy from pTarget + for (int i = 0; i < pTarget->numOfCols; i++) { + ASSERT(target->cols[i].type == pTarget->cols[i].type); + memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), + (void *)((char *)(pTarget->cols[i].pData) + TYPE_BYTES[pTarget->cols[i].type] * iter1), + TYPE_BYTES[target->cols[i].type]); + target->cols[i].len += TYPE_BYTES[target->cols[i].type]; } - break; - } - if (iter2 >= source->numOfPoints) { - // TODO: merge the pTemp part - int rowsLeft = pTarget->numOfPoints - iter1; - if (rowsLeft > 0) { + target->numOfPoints++; + iter1++; + } else if (key1 > key2) { // Copy from source + for (int i = 0; i < source->numOfCols; i++) { + ASSERT(target->cols[i].type == pTarget->cols[i].type); + memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), + (void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2), + TYPE_BYTES[target->cols[i].type]); + target->cols[i].len += TYPE_BYTES[target->cols[i].type]; } - break; + + target->numOfPoints++; + iter2++; + } else { + assert(false); } } + tdFreeDataCols(pTarget); return 0; _err: + tdFreeDataCols(pTarget); return -1; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 02216cc69b9d5da933182f4cdac580076b704ca4..ac96cff3663f37f903f12dfd6ae136c73f968e57 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -335,7 +335,7 @@ _err: int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompBlock compBlock; - if ((pHelper->files.nHeadF.fd > 0) && (pHelper->hasOldLastBlock)) { + if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfSuperBlocks - 1; @@ -375,6 +375,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; } } else { + pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; + pHelper->pCompInfo->uid = pHelper->tableInfo.uid; taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE); @@ -530,6 +532,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa if (pCompData == NULL) return -1; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); @@ -947,6 +950,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock); if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; + pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + // Add the sub-block if (pSCompBlock->numOfSubBlocks > 1) { size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len); diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index b54db6febbad7a7361aa6977f755a47dcf740ea9..9d1bae18327c77978745f08a647974729b55bc13 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -5,12 +5,84 @@ #include "dataformat.h" #include "tsdbMain.h" -double getCurTime() { +static double getCurTime() { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec + tv.tv_usec * 1E-6; } +typedef struct { + tsdb_repo_t *pRepo; + int tid; + int64_t uid; + int sversion; + TSKEY startTime; + TSKEY interval; + int totalRows; + int rowsPerSubmit; + STSchema * pSchema; +} SInsertInfo; + +static int insertData(SInsertInfo *pInfo) { + SSubmitMsg *pMsg = + (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit); + if (pMsg == NULL) return -1; + TSKEY start_time = pInfo->startTime; + + // Loop to write data + double stime = getCurTime(); + + for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) { + memset((void *)pMsg, 0, sizeof(SSubmitMsg)); + SSubmitBlk *pBlock = pMsg->blocks; + pBlock->uid = pInfo->uid; + pBlock->tid = pInfo->tid; + pBlock->sversion = pInfo->sversion; + pBlock->len = 0; + for (int i = 0; i < pInfo->rowsPerSubmit; i++) { + // start_time += 1000; + start_time += pInfo->interval; + SDataRow row = (SDataRow)(pBlock->data + pBlock->len); + tdInitDataRow(row, pInfo->pSchema); + + for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) { + if (j == 0) { // Just for timestamp + tdAppendColVal(row, (void *)(&start_time), schemaColAt(pInfo->pSchema, j)); + } else { // For int + int val = 10; + tdAppendColVal(row, (void *)(&val), schemaColAt(pInfo->pSchema, j)); + } + } + pBlock->len += dataRowLen(row); + } + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + pMsg->numOfBlocks = 1; + + pBlock->len = htonl(pBlock->len); + pBlock->numOfRows = htonl(pBlock->numOfRows); + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + + pBlock->sversion = htonl(pBlock->sversion); + pBlock->padding = htonl(pBlock->padding); + + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + pMsg->compressed = htonl(pMsg->numOfBlocks); + + if (tsdbInsertData(pInfo->pRepo, pMsg) < 0) { + tfree(pMsg); + return -1; + } + } + + double etime = getCurTime(); + + printf("Spent %f seconds to write %d records\n", etime - stime, pInfo->totalRows); + tfree(pMsg); + return 0; +} + TEST(TsdbTest, DISABLED_tableEncodeDecode) { // TEST(TsdbTest, tableEncodeDecode) { STable *pTable = (STable *)malloc(sizeof(STable)); @@ -51,6 +123,7 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { // TEST(TsdbTest, DISABLED_createRepo) { TEST(TsdbTest, createRepo) { STsdbCfg config; + STsdbRepo *repo; // 1. Create a tsdb repository tsdbSetDefaultCfg(&config); @@ -79,64 +152,73 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); - // // 3. Loop to write some simple data - int nRows = 10000000; - int rowsPerSubmit = 10; - int64_t start_time = 1584081000000; - - SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); - - double stime = getCurTime(); - - for (int k = 0; k < nRows/rowsPerSubmit; k++) { - memset((void *)pMsg, 0, sizeof(SSubmitMsg)); - SSubmitBlk *pBlock = pMsg->blocks; - pBlock->uid = 987607499877672L; - pBlock->tid = 0; - pBlock->sversion = 0; - pBlock->len = 0; - for (int i = 0; i < rowsPerSubmit; i++) { - // start_time += 1000; - start_time += 1000; - SDataRow row = (SDataRow)(pBlock->data + pBlock->len); - tdInitDataRow(row, schema); - - for (int j = 0; j < schemaNCols(schema); j++) { - if (j == 0) { // Just for timestamp - tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); - } else { // For int - int val = 10; - tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); - } - } - pBlock->len += dataRowLen(row); - } - pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; - pMsg->numOfBlocks = 1; - - pBlock->len = htonl(pBlock->len); - pBlock->numOfRows = htonl(pBlock->numOfRows); - pBlock->uid = htobe64(pBlock->uid); - pBlock->tid = htonl(pBlock->tid); - - pBlock->sversion = htonl(pBlock->sversion); - pBlock->padding = htonl(pBlock->padding); + // Insert Some Data + SInsertInfo iInfo = { + .pRepo = pRepo, + .tid = tCfg.tableId.tid, + .uid = tCfg.tableId.uid, + .sversion = tCfg.sversion, + .startTime = 1584081000000, + .interval = 1000, + .totalRows = 50, + .rowsPerSubmit = 1, + .pSchema = schema + }; + + ASSERT_EQ(insertData(&iInfo), 0); + + // Close the repository + tsdbCloseRepo(pRepo); - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - pMsg->compressed = htonl(pMsg->numOfBlocks); + // Open the repository again + pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + repo = (STsdbRepo *)pRepo; + ASSERT_NE(pRepo, nullptr); - tsdbInsertData(pRepo, pMsg); - } + // Insert more data + iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows; + iInfo.totalRows = 10; + iInfo.pRepo = pRepo; + ASSERT_EQ(insertData(&iInfo), 0); - double etime = getCurTime(); + // Close the repository + tsdbCloseRepo(pRepo); - void *ptr = malloc(150000); - free(ptr); + // Open the repository again + pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + repo = (STsdbRepo *)pRepo; + ASSERT_NE(pRepo, nullptr); - printf("Spent %f seconds to write %d records\n", etime - stime, nRows); + // Read from file + SRWHelper rhelper; + SHelperCfg helperCfg = { + .type = TSDB_READ_HELPER, + .maxTables = repo->config.maxTables, + .maxRowSize = repo->tsdbMeta->maxRowBytes, + .maxRows = repo->config.maxRowsPerFileBlock, + .maxCols = repo->tsdbMeta->maxCols, + .minRowsPerFileBlock = repo->config.minRowsPerFileBlock, + .maxRowsPerFileBlock = repo->config.maxRowsPerFileBlock, + .compress = repo->config.compression, + + }; + tsdbInitHelper(&rhelper, &helperCfg); + + SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833); + ASSERT_NE(pFGroup, nullptr); + ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0); + + SHelperTable htable = { + .uid = tCfg.tableId.uid, + .tid = tCfg.tableId.tid, + .sversion = tCfg.sversion + }; + tsdbSetHelperTable(&rhelper, &htable, schema); + + ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0); + ASSERT_EQ(tsdbLoadBlockData(&rhelper, 0, NULL), 0); - tsdbCloseRepo(pRepo); + int k = 0; } TEST(TsdbTest, DISABLED_openRepo) {