diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index fcae7a415fae8bc1b5897bf6865f1cc2004ba731..8ee73291566afcbf604b607c0bc6a426ca372b87 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -319,7 +319,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); -int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset); // ----------------- K-V data row structure /* diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index f5b84e4c9ad7492ecea11eb232b33f373d37235a..7ae34d532ca7ca52d3414a9d8f7366db15e046b8 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -441,30 +441,35 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) pCols->numOfRows++; } -int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); + int offset = 0; + + if (pOffset == NULL) { + pOffset = &offset; + } SDataCols *pTarget = NULL; - if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap + if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyFirst(source))) { // No overlap ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { if (source->cols[j].len > 0) { - dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows, + dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, target->maxPoints); } } target->numOfRows++; } + (*pOffset) += rowsToMerge; } else { pTarget = tdDupDataCols(target, true); if (pTarget == NULL) goto _err; int iter1 = 0; - int iter2 = 0; - tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows, + tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows, pTarget->numOfRows + rowsToMerge); } diff --git a/src/inc/tfs.h b/src/inc/tfs.h index 4ed21bc6e1ddb4d9011d701ad99e6f2a71f54b0e..e72620eca6965d78609bbbd283fff8085d08a4b8 100644 --- a/src/inc/tfs.h +++ b/src/inc/tfs.h @@ -31,6 +31,8 @@ typedef struct { #define TFS_UNDECIDED_ID -1 #define TFS_PRIMARY_LEVEL 0 #define TFS_PRIMARY_ID 0 +#define TFS_MIN_LEVEL 0 +#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1) // FS APIs ==================================== typedef struct { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 376836369dbbe9b623b6619281277e61ab5ef4f0..05d29daad52c2eeeeee080b411ebedf4b7e8233f 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -409,6 +409,9 @@ void tsdbDecCommitRef(int vgId); int tsdbSyncSend(void *pRepo, SOCKET socketFd); int tsdbSyncRecv(void *pRepo, SOCKET socketFd); +// For TSDB Compact +int tsdbCompact(STsdbRepo *pRepo); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/CMakeLists.txt b/src/tsdb/CMakeLists.txt index 31d52aae7d4a809044ab01a7b561801d1ad0c2eb..8080a61a6c9b10b78d965a953765250e0a157fb6 100644 --- a/src/tsdb/CMakeLists.txt +++ b/src/tsdb/CMakeLists.txt @@ -6,6 +6,10 @@ AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tsdb ${SRC}) TARGET_LINK_LIBRARIES(tsdb tfs common tutil) +IF (TD_TSDB_PLUGINS) + TARGET_LINK_LIBRARIES(tsdb tsdbPlugins) +ENDIF () + IF (TD_LINUX) # Someone has no gtest directory, so comment it # ADD_SUBDIRECTORY(tests) diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h index 6bd5dc432584bf1e7f8ed4274efdf014edf54e50..cde728b1705cd1eead065772978631fb4b36246d 100644 --- a/src/tsdb/inc/tsdbCommit.h +++ b/src/tsdb/inc/tsdbCommit.h @@ -29,10 +29,17 @@ typedef struct { int64_t size; } SKVRecord; +#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) + void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbCommitData(STsdbRepo *pRepo); +int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); +int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); +int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, + bool isLast, bool isSuper, void **ppBuf, void **ppCBuf); int tsdbApplyRtn(STsdbRepo *pRepo); static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { diff --git a/src/tsdb/inc/tsdbCommitQueue.h b/src/tsdb/inc/tsdbCommitQueue.h index c2353391f9ad88185097eb52907e890c96021f8c..b690e3bdc25d86acf7e5b9d580470a80f3f4316f 100644 --- a/src/tsdb/inc/tsdbCommitQueue.h +++ b/src/tsdb/inc/tsdbCommitQueue.h @@ -16,6 +16,8 @@ #ifndef _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_ -int tsdbScheduleCommit(STsdbRepo *pRepo); +typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T; + +int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req); #endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbCompact.h b/src/tsdb/inc/tsdbCompact.h new file mode 100644 index 0000000000000000000000000000000000000000..5a382de5e0cc438dbd444d9a0895fd5f55094476 --- /dev/null +++ b/src/tsdb/inc/tsdbCompact.h @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_TSDB_COMPACT_H_ +#define _TD_TSDB_COMPACT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +void *tsdbCompactImpl(STsdbRepo *pRepo); + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_COMPACT_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 945b74af353bcf771c870a8e66fff7eb5f51c614..049c1bdb6ea37121a202a931faa17a4ea36cf6dc 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -64,6 +64,8 @@ extern "C" { #include "tsdbReadImpl.h" // Commit #include "tsdbCommit.h" +// Compact +#include "tsdbCompact.h" // Commit Queue #include "tsdbCommitQueue.h" // Main definitions diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index f2bbe347bd0bfa1f8f7ae5feb016dbfea13adfd7..82cc6f07f77300aadd554a7c22c0cf77308b3e53 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -51,7 +51,7 @@ typedef struct { #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) -#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5) +#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) static int tsdbCommitMeta(STsdbRepo *pRepo); @@ -72,7 +72,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbComparKeyBlock(const void *arg1, const void *arg2); static int tsdbWriteBlockInfo(SCommitH *pCommih); -static int tsdbWriteBlockIdx(SCommitH *pCommih); static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); static int tsdbMoveBlock(SCommitH *pCommith, int bidx); @@ -86,7 +85,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); -static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); void *tsdbCommitData(STsdbRepo *pRepo) { if (pRepo->imem == NULL) { @@ -117,6 +115,151 @@ _err: return NULL; } +int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { + SDiskID did; + SDFileSet nSet; + STsdbFS * pfs = REPO_FS(pRepo); + int level; + + ASSERT(pSet->fid >= pRtn->minFid); + + level = tsdbGetFidLevel(pSet->fid, pRtn); + + tfsAllocDisk(level, &(did.level), &(did.id)); + if (did.level == TFS_UNDECIDED_LEVEL) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } + + if (did.level > TSDB_FSET_LEVEL(pSet)) { + // Need to move the FSET to higher level + tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs)); + + if (tsdbCopyDFileSet(pSet, &nSet) < 0) { + tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); + return -1; + } + + if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { + return -1; + } + + tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); + } else { + // On a correct level + if (tsdbUpdateDFileSet(pfs, pSet) < 0) { + return -1; + } + } + + return 0; +} + +int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, + SBlockIdx *pIdx) { + size_t nSupBlocks; + size_t nSubBlocks; + uint32_t tlen; + SBlockInfo *pBlkInfo; + int64_t offset; + SBlock * pBlock; + + memset(pIdx, 0, sizeof(*pIdx)); + + nSupBlocks = taosArrayGetSize(pSupA); + nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); + + if (nSupBlocks <= 0) { + // No data (data all deleted) + return 0; + } + + tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); + if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; + pBlkInfo = *ppBuf; + + pBlkInfo->delimiter = TSDB_FILE_DELIMITER; + pBlkInfo->tid = TABLE_TID(pTable); + pBlkInfo->uid = TABLE_UID(pTable); + + memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); + if (nSubBlocks > 0) { + memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); + + for (int i = 0; i < nSupBlocks; i++) { + pBlock = pBlkInfo->blocks + i; + + if (pBlock->numOfSubBlocks > 1) { + pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); + } + } + } + + taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); + + if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { + return -1; + } + + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); + + // Set pIdx + pBlock = taosArrayGetLast(pSupA); + + pIdx->tid = TABLE_TID(pTable); + pIdx->uid = TABLE_UID(pTable); + pIdx->hasLast = pBlock->last ? 1 : 0; + pIdx->maxKey = pBlock->keyLast; + pIdx->numOfBlocks = (uint32_t)nSupBlocks; + pIdx->len = tlen; + pIdx->offset = (uint32_t)offset; + + return 0; +} + +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { + SBlockIdx *pBlkIdx; + size_t nidx = taosArrayGetSize(pIdxA); + int tlen = 0, size; + int64_t offset; + + if (nidx <= 0) { + // All data are deleted + pHeadf->info.offset = 0; + pHeadf->info.len = 0; + return 0; + } + + for (size_t i = 0; i < nidx; i++) { + pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); + + size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); + if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; + + void *ptr = POINTER_SHIFT(*ppBuf, tlen); + tsdbEncodeSBlockIdx(&ptr, pBlkIdx); + + tlen += size; + } + + tlen += sizeof(TSCKSUM); + if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; + taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); + + if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { + return -1; + } + + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); + pHeadf->info.offset = (uint32_t)offset; + pHeadf->info.len = tlen; + + return 0; +} + + // =================== Commit Meta Data static int tsdbCommitMeta(STsdbRepo *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); @@ -446,7 +589,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { } } - if (tsdbWriteBlockIdx(pCommith) < 0) { + if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < + 0) { tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbCloseCommitFile(pCommith, true); // revert the file change @@ -754,23 +898,21 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { } } -static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, - bool isSuper) { - STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith); +int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, + bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) { STsdbCfg * pCfg = REPO_CFG(pRepo); SBlockData *pBlockData; int64_t offset = 0; - STable * pTable = TSDB_COMMIT_TABLE(pCommith); int rowsToWrite = pDataCols->numOfRows; ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); // Make buffer space - if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { + if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { return -1; } - pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith); + pBlockData = (SBlockData *)(*ppBuf); // Get # of cols not all NULL(not including key column) int nColsNotAllNull = 0; @@ -816,23 +958,23 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo void * tptr; // Make room - if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { + if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { return -1; } - pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith); + pBlockData = (SBlockData *)(*ppBuf); pBlockCol = pBlockData->cols + tcol; tptr = POINTER_SHIFT(pBlockData, lsize); if (pCfg->compression == TWO_STAGE_COMP && - tsdbMakeRoom((void **)(&TSDB_COMMIT_COMP_BUF(pCommith)), tlen + COMP_OVERFLOW_BYTES) < 0) { + tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { return -1; } // Compress or just copy if (pCfg->compression) { flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, - tlen + COMP_OVERFLOW_BYTES, pCfg->compression, - TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES); + tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, + tlen + COMP_OVERFLOW_BYTES); } else { flen = tlen; memcpy(tptr, pDataCol->pData, flen); @@ -888,117 +1030,33 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo return 0; } -static int tsdbWriteBlockInfo(SCommitH *pCommih) { - SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); - SBlockIdx blkIdx; - STable * pTable = TSDB_COMMIT_TABLE(pCommih); - SBlock * pBlock; - size_t nSupBlocks; - size_t nSubBlocks; - uint32_t tlen; - SBlockInfo *pBlkInfo; - int64_t offset; - - nSupBlocks = taosArrayGetSize(pCommih->aSupBlk); - nSubBlocks = taosArrayGetSize(pCommih->aSubBlk); - - if (nSupBlocks <= 0) { - // No data (data all deleted) - return 0; - } - - tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); - - // Write SBlockInfo part - if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1; - pBlkInfo = TSDB_COMMIT_BUF(pCommih); - - pBlkInfo->delimiter = TSDB_FILE_DELIMITER; - pBlkInfo->tid = TABLE_TID(pTable); - pBlkInfo->uid = TABLE_UID(pTable); - - memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pCommih->aSupBlk, 0), nSupBlocks * sizeof(SBlock)); - if (nSubBlocks > 0) { - memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pCommih->aSubBlk, 0), nSubBlocks * sizeof(SBlock)); - - for (int i = 0; i < nSupBlocks; i++) { - pBlock = pBlkInfo->blocks + i; - - if (pBlock->numOfSubBlocks > 1) { - pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); - } - } - } - - taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); - - if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < 0) { - return -1; - } - - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); - - // Set blkIdx - pBlock = taosArrayGet(pCommih->aSupBlk, nSupBlocks - 1); +static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuper) { + return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast, + isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), + (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); +} - blkIdx.tid = TABLE_TID(pTable); - blkIdx.uid = TABLE_UID(pTable); - blkIdx.hasLast = pBlock->last ? 1 : 0; - blkIdx.maxKey = pBlock->keyLast; - blkIdx.numOfBlocks = (uint32_t)nSupBlocks; - blkIdx.len = tlen; - blkIdx.offset = (uint32_t)offset; - ASSERT(blkIdx.numOfBlocks > 0); +static int tsdbWriteBlockInfo(SCommitH *pCommih) { + SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); + SBlockIdx blkIdx; + STable * pTable = TSDB_COMMIT_TABLE(pCommih); - if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), + &blkIdx) < 0) { return -1; } - return 0; -} - -static int tsdbWriteBlockIdx(SCommitH *pCommih) { - SBlockIdx *pBlkIdx = NULL; - SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); - size_t nidx = taosArrayGetSize(pCommih->aBlkIdx); - int tlen = 0, size = 0; - int64_t offset = 0; - - if (nidx <= 0) { - // All data are deleted - pHeadf->info.offset = 0; - pHeadf->info.len = 0; + if (blkIdx.numOfBlocks == 0) { return 0; } - for (size_t i = 0; i < nidx; i++) { - pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i); - - size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); - if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen + size) < 0) return -1; - - void *ptr = POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen); - tsdbEncodeSBlockIdx(&ptr, pBlkIdx); - - tlen += size; - } - - tlen += sizeof(TSCKSUM); - if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen) < 0) return -1; - taosCalcChecksumAppend(0, (uint8_t *)TSDB_COMMIT_BUF(pCommih), tlen); - - if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < tlen) { - tsdbError("vgId:%d failed to write block index part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih), - TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno)); + if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen - sizeof(TSCKSUM))); - pHeadf->info.offset = (uint32_t)offset; - pHeadf->info.len = tlen; - return 0; } @@ -1454,45 +1512,3 @@ int tsdbApplyRtn(STsdbRepo *pRepo) { return 0; } - -static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { - SDiskID did; - SDFileSet nSet; - STsdbFS * pfs = REPO_FS(pRepo); - int level; - - ASSERT(pSet->fid >= pRtn->minFid); - - level = tsdbGetFidLevel(pSet->fid, pRtn); - - tfsAllocDisk(level, &(did.level), &(did.id)); - if (did.level == TFS_UNDECIDED_LEVEL) { - terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; - return -1; - } - - if (did.level > TSDB_FSET_LEVEL(pSet)) { - // Need to move the FSET to higher level - tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs)); - - if (tsdbCopyDFileSet(pSet, &nSet) < 0) { - tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); - return -1; - } - - if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { - return -1; - } - - tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); - } else { - // On a correct level - if (tsdbUpdateDFileSet(pfs, pSet) < 0) { - return -1; - } - } - - return 0; -} \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 4d4585421c854eefa92ef91605381ab5b54514fe..e25014bc1e8f2456eece3d517096cfee66886800 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -26,8 +26,9 @@ typedef struct { } SCommitQueue; typedef struct { + TSDB_REQ_T req; STsdbRepo *pRepo; -} SCommitReq; +} SReq; static void *tsdbLoopCommit(void *arg); @@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() { pthread_mutex_destroy(&(pQueue->lock)); } -int tsdbScheduleCommit(STsdbRepo *pRepo) { +int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { SCommitQueue *pQueue = &tsCommitQueue; - SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq)); + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq)); if (pNode == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - ((SCommitReq *)pNode->data)->pRepo = pRepo; + ((SReq *)pNode->data)->req = req; + ((SReq *)pNode->data)->pRepo = pRepo; pthread_mutex_lock(&(pQueue->lock)); @@ -154,7 +156,7 @@ static void *tsdbLoopCommit(void *arg) { SCommitQueue *pQueue = &tsCommitQueue; SListNode * pNode = NULL; STsdbRepo * pRepo = NULL; - bool config_changed = false; + TSDB_REQ_T req; while (true) { pthread_mutex_lock(&(pQueue->lock)); @@ -175,18 +177,19 @@ static void *tsdbLoopCommit(void *arg) { pthread_mutex_unlock(&(pQueue->lock)); - pRepo = ((SCommitReq *)pNode->data)->pRepo; + req = ((SReq *)pNode->data)->req; + pRepo = ((SReq *)pNode->data)->pRepo; - // check if need to apply new config - config_changed = pRepo->config_changed; - if (pRepo->config_changed) { + if (req == COMMIT_REQ) { + tsdbCommitData(pRepo); + } else if (req == COMPACT_REQ) { + tsdbCompactImpl(pRepo); + } else if (req == COMMIT_CONFIG_REQ) { + ASSERT(pRepo->config_changed); tsdbApplyRepoConfig(pRepo); - } - - if (config_changed && pRepo->imem == NULL) { tsem_post(&(pRepo->readyToCommit)); } else { - tsdbCommitData(pRepo); + ASSERT(0); } listNodeFree(pNode); diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 6dea4a4e57392be988126c579648f39a8270b9bf..635bba388a6063868ca002859044d0845cb5c163 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -11,4 +11,12 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ +#include "tsdb.h" + +#ifndef _TSDB_PLUGINS + +int tsdbCompact(STsdbRepo *pRepo) { return 0; } +void *tsdbCompactImpl(STsdbRepo *pRepo) { return NULL; } + +#endif \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 4ca7b3b82000bed9a9460f4195f1d4db7e4c76e1..1f88474f8e70beb4e31afcbc3c460b7ab80c45dd 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -280,7 +280,7 @@ int tsdbAsyncCommitConfig(STsdbRepo* pRepo) { } if (tsdbLockRepo(pRepo) < 0) return -1; - tsdbScheduleCommit(pRepo); + tsdbScheduleCommit(pRepo, COMMIT_CONFIG_REQ); if (tsdbUnlockRepo(pRepo) < 0) return -1; return 0; @@ -303,7 +303,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { if (tsdbLockRepo(pRepo) < 0) return -1; pRepo->imem = pRepo->mem; pRepo->mem = NULL; - tsdbScheduleCommit(pRepo); + tsdbScheduleCommit(pRepo, COMMIT_REQ); if (tsdbUnlockRepo(pRepo) < 0) return -1; return 0; diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 7212ae1636400b0ce45ef27306356f658fb3f90d..dd14dc700f85c3126411cf4613651463745f71d5 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -258,7 +258,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; - if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1; } ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); @@ -284,7 +284,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; - if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1; } ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); diff --git a/tests/pytest/alter/alter_cacheLastRow.py b/tests/pytest/alter/alter_cacheLastRow.py new file mode 100644 index 0000000000000000000000000000000000000000..36a2864d0f91cf485505ee105870a98df71c4df8 --- /dev/null +++ b/tests/pytest/alter/alter_cacheLastRow.py @@ -0,0 +1,109 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from datetime import datetime + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def run(self): + tdSql.prepare() + tdSql.query('show databases') + tdSql.checkData(0,15,0) + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath + "/build/bin/" + + #write 5M rows into db, then restart to force the data move into disk. + #create 500 tables + os.system("%staosdemo -f tools/taosdemoAllTest/insert_5M_rows.json -y " % binPath) + tdDnodes.stop(1) + tdDnodes.start(1) + tdSql.execute('use db') + + #prepare to query 500 tables last_row() + tableName = [] + for i in range(500): + tableName.append(f"stb_{i}") + tdSql.execute('use db') + lastRow_Off_start = datetime.now() + + slow = 0 #count time where lastRow on is slower + for i in range(5): + #switch lastRow to off and check + tdSql.execute('alter database db cachelast 0') + tdSql.query('show databases') + tdSql.checkData(0,15,0) + + #run last_row(*) query 500 times + for i in range(500): + tdSql.execute(f'SELECT LAST_ROW(*) FROM {tableName[i]}') + lastRow_Off_end = datetime.now() + + tdLog.debug(f'time used:{lastRow_Off_end-lastRow_Off_start}') + + #switch lastRow to on and check + tdSql.execute('alter database db cachelast 1') + tdSql.query('show databases') + tdSql.checkData(0,15,1) + + #run last_row(*) query 500 times + tdSql.execute('use db') + lastRow_On_start = datetime.now() + for i in range(500): + tdSql.execute(f'SELECT LAST_ROW(*) FROM {tableName[i]}') + lastRow_On_end = datetime.now() + + tdLog.debug(f'time used:{lastRow_On_end-lastRow_On_start}') + + #check which one used more time + if (lastRow_Off_end-lastRow_Off_start > lastRow_On_end-lastRow_On_start): + pass + else: + slow += 1 + tdLog.debug(slow) + if slow > 1: #tolerance for the first time + tdLog.exit('lastRow hot alter failed') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index c5aba24867a0a45d216902fc685aa0153ddbff14..19bc5e9cf920344a1caeebadcc7f0a62c853d50e 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -336,5 +336,5 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py python3 ./test.py -f tag_lite/drop_auto_create.py python3 test.py -f insert/insert_before_use_db.py - +python3 test.py -f alter/alter_cacheLastRow.py #======================p4-end=============== diff --git a/tests/pytest/tools/taosdemoAllTest/insert_5M_rows.json b/tests/pytest/tools/taosdemoAllTest/insert_5M_rows.json new file mode 100644 index 0000000000000000000000000000000000000000..4637009ca36ef74dd445a166b5fedf782528d513 --- /dev/null +++ b/tests/pytest/tools/taosdemoAllTest/insert_5M_rows.json @@ -0,0 +1,60 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 4, + "thread_count_create_tbl": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "insert_interval": 0, + "interlace_rows": 100, + "num_of_records_per_req": 100, + "databases": [{ + "dbinfo": { + "name": "db", + "drop": "yes", + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", + "keep": 3650, + "minRows": 100, + "maxRows": 4096, + "comp":2, + "walLevel":1, + "cachelast":0, + "quorum":1, + "fsync":3000, + "update": 0 + }, + "super_tables": [{ + "name": "stb", + "child_table_exists":"no", + "childtable_count": 500, + "childtable_prefix": "stb_", + "auto_create_table": "no", + "batch_create_tbl_num": 20, + "data_source": "rand", + "insert_mode": "taosc", + "insert_rows": 10000, + "childtable_limit": 10, + "childtable_offset":100, + "interlace_rows": 0, + "insert_interval":0, + "max_sql_len": 1024000, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 10, + "start_timestamp": "2020-10-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "tags_file": "", + "columns": [{"type": "INT"}], + "tags": [{"type": "TINYINT", "count":2}] + }] + }] +} \ No newline at end of file