diff --git a/cmake/cmake.version b/cmake/cmake.version index db29644b387306ce8f3ee473921dab4c7d05b10a..0873e59e92c0d70ad41d3772a76009d50239e4cb 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.0.1") + SET(TD_VER_NUMBER "3.0.0.2") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index 54e898ecc093a5401b93080a13156f284a4cd0a4..15d37648769afb0731fb2ed3585ccc8f48f76e7e 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git - GIT_TAG 6fc47d7 + GIT_TAG 0609b50 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 939439ca56c33cffe942466b3886845567650d5c..c598b188b7fab2478e5777db1251d56d0fa1a166 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1592,6 +1592,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, i); if (i == 0) { colDataAppend(pColInfo, rows, buf, false); + } else if (i == 1) { + colDataAppend(pColInfo, rows, (const char *)&pDb->createdTime, false); } else if (i == 3) { colDataAppend(pColInfo, rows, (const char *)&numOfTables, false); } else if (i == 14) { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c9d1c3d015a07ef78bb28e4b1c4e9de9346a8b56..165f5da4b643987eee82afe4476f24dddd935b77 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -211,6 +211,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version); void tdRSmaFSClose(SRSmaFS *fs); int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version); void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version); +int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat); int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2c53eb9e4743c0db1192a3cf9a90abbad28c2d67..4cd1971be02faf6a4cd29b0e2d2c710aa33cebf0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -643,11 +643,20 @@ typedef struct { TSDBROW row; } SRowInfo; +typedef struct SSttBlockLoadInfo { + SBlockData blockData[2]; + SArray *aSttBlk; + int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. + int32_t currentLoadBlockIndex; +} SSttBlockLoadInfo; + typedef struct SMergeTree { int8_t backward; SRBTree rbt; SArray *pIterList; SLDataIter *pIter; + bool destroyLoadInfo; + SSttBlockLoadInfo* pLoadInfo; } SMergeTree; typedef struct { @@ -657,12 +666,16 @@ typedef struct { } SSkmInfo; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); +SSttBlockLoadInfo* tCreateLastBlockLoadInfo(); +void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); +void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); + // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index fd300ef34a07960296113f61074b47d87ff60697..f08cefdb0471a03ef79cf0633402d2fbb475ae03 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -182,6 +182,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { SVnode *pVnode = pSma->pVnode; SRSmaFS *pFS = RSMA_FS(pStat); int64_t committed = pStat->commitAppliedVer; + int64_t fsMaxVer = -1; char qTaskInfoFullName[TSDB_FILENAME_LEN]; taosWLockLatch(RSMA_FS_LOCK(pStat)); @@ -204,10 +205,20 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { ++i; } - SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; - if (tdRSmaFSUpsertQTaskFile(pFS, &qFile) < 0) { - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - return TSDB_CODE_FAILED; + if (taosArrayGetSize(pFS->aQTaskInf) > 0) { + fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version; + } + + if (fsMaxVer < committed) { + SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; + if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) { + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } else { + smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode), + committed, fsMaxVer); } taosWUnLockLatch(RSMA_FS_LOCK(pStat)); @@ -365,7 +376,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); // step 1: merge qTaskInfo and iQTaskInfo // lock diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 4387db553e6cfe21c0e14ff7b57ea388552ec969..8e8611f0e869d9e3d29c7456af142f05dba15b91 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -49,7 +49,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) { for (int32_t i = 0; i < taosArrayGetSize(output); ++i) { int32_t vid = 0; int64_t version = -1; - sscanf((const char *)taosArrayGetP(output, i), "v%dqinfo.v%" PRIi64, &vid, &version); + sscanf((const char *)taosArrayGetP(output, i), "v%dqinf.v%" PRIi64, &vid, &version); SQTaskFile qTaskFile = {.version = version, .nRef = 1}; if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) { goto _end; @@ -96,6 +96,18 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { return oldVal; } +int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) { + SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; + int64_t version = -1; + + taosRLockLatch(RSMA_FS_LOCK(pStat)); + if (taosArrayGetSize(aQTaskInf) > 0) { + version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version; + } + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + return version; +} + void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { SVnode *pVnode = pSma->pVnode; SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index fdd173cdf7b52ab7fc52738fd8fda29c25d08e72..920d04bf6c679a6a0110f7e9d53093421ff77061 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1342,29 +1342,31 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF return TSDB_CODE_FAILED; } - if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) { + if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) { return TSDB_CODE_FAILED; } int32_t infoLen = 0; - taosDecodeFixedI32(pIter->qBuf, &infoLen); + taosDecodeFixedI32(pIter->pBuf, &infoLen); if (infoLen > nBytes) { if (infoLen <= RSMA_QTASKINFO_BUFSIZE) { terrno = TSDB_CODE_RSMA_FILE_CORRUPTED; smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr()); return TSDB_CODE_FAILED; } - pIter->nAlloc = infoLen; - void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen); - if (!pBuf) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + if (pIter->nAlloc < infoLen) { + pIter->nAlloc = infoLen; + void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen); + if (!pBuf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + pIter->pBuf = pBuf; } - pIter->pBuf = pBuf; - pIter->qBuf = pIter->pBuf; + nBytes = infoLen; - if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) { + if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) { return TSDB_CODE_FAILED; } @@ -1373,6 +1375,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF } } + pIter->qBuf = pIter->pBuf; pIter->offset += nBytes; pIter->nBytes = nBytes; pIter->nBufPos = 0; @@ -1450,17 +1453,24 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { return TSDB_CODE_SUCCESS; } + int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); + if (pRSmaStat->commitAppliedVer <= fsMaxVer) { + smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, + pRSmaStat->commitAppliedVer, fsMaxVer); + return TSDB_CODE_SUCCESS; + } + STFile tFile = {0}; #if 0 if (pRSmaStat->commitAppliedVer > 0) { char qTaskInfoFName[TSDB_FILENAME_LEN]; tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { - smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); + smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; } if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { - smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); + smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); goto _err; } smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile)); @@ -1510,11 +1520,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { char qTaskInfoFName[TSDB_FILENAME_LEN]; tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { - smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); + smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; } if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { - smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); + smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); goto _err; } smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, @@ -1558,7 +1568,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } return TSDB_CODE_SUCCESS; _err: - smaError("vgId:%d, rsma persit failed since %s", vid, terrstr()); + smaError("vgId:%d, rsma persist failed since %s", vid, terrstr()); if (isFileCreated) { tdRemoveTFile(&tFile); tdDestroyTFile(&tFile); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 9ac62b4b59c51a397776789d9b80d66d978d45d4..8af764c4bcafee9e8793431c29307d3f5b446fe4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL); bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index a072f22fa985f6715f14469fd2b88b54d2e8b225..950c9348af69d8b4275785da95d0b64db490b811 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -22,26 +22,106 @@ struct SLDataIter { SDataFReader *pReader; int32_t iStt; int8_t backward; - SArray *aSttBlk; int32_t iSttBlk; - SBlockData bData[2]; - int32_t loadIndex; int32_t iRow; SRowInfo rInfo; uint64_t uid; STimeWindow timeWindow; SVersionRange verRange; + + SSttBlockLoadInfo* pBlockLoadInfo; }; -static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; } +SSttBlockLoadInfo* tCreateLastBlockLoadInfo() { + SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo)); + if (pLoadInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) { + pLoadInfo[i].blockIndex[0] = -1; + pLoadInfo[i].blockIndex[1] = -1; + pLoadInfo[i].currentLoadBlockIndex = 1; + + int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]); + if (code) { + terrno = code; + } + + code = tBlockDataCreate(&pLoadInfo[i].blockData[1]); + if (code) { + terrno = code; + } + + pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); + } + + return pLoadInfo; +} + +void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) { + for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) { + pLoadInfo[i].currentLoadBlockIndex = 1; + pLoadInfo[i].blockIndex[0] = -1; + pLoadInfo[i].blockIndex[1] = -1; -static SBlockData *getNextBlock(SLDataIter *pIter) { - pIter->loadIndex ^= 1; - return getCurrentBlock(pIter); + taosArrayClear(pLoadInfo[i].aSttBlk); + } +} + +void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) { + for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) { + pLoadInfo[i].currentLoadBlockIndex = 1; + pLoadInfo[i].blockIndex[0] = -1; + pLoadInfo[i].blockIndex[1] = -1; + + tBlockDataDestroy(&pLoadInfo[i].blockData[0], true); + tBlockDataDestroy(&pLoadInfo[i].blockData[1], true); + + taosArrayDestroy(pLoadInfo[i].aSttBlk); + } + + taosMemoryFree(pLoadInfo); + return NULL; +} + +static SBlockData* loadBlockIfMissing(SLDataIter *pIter) { + int32_t code = 0; + + SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; + if (pInfo->blockIndex[0] == pIter->iSttBlk) { + return &pInfo->blockData[0]; + } + + if (pInfo->blockIndex[1] == pIter->iSttBlk) { + return &pInfo->blockData[1]; + } + + pInfo->currentLoadBlockIndex ^= 1; + if (pIter->pSttBlk != NULL) { // current block not loaded yet + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]); + tsdbDebug("read last block, index:%d, last file index:%d", pIter->iSttBlk, pIter->iStt); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; + pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; + } + + return &pInfo->blockData[pInfo->currentLoadBlockIndex]; + + _exit: + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + } + + return NULL; } int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, - uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) { + uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) { int32_t code = 0; *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); if (*pIter == NULL) { @@ -55,34 +135,22 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t (*pIter)->backward = backward; (*pIter)->verRange = *pRange; (*pIter)->timeWindow = *pTimeWindow; - (*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if ((*pIter)->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - code = tBlockDataCreate(&(*pIter)->bData[0]); - if (code) { - goto _exit; - } - code = tBlockDataCreate(&(*pIter)->bData[1]); - if (code) { - goto _exit; - } - - code = tsdbReadSttBlk(pReader, iStt, (*pIter)->aSttBlk); - if (code) { - goto _exit; + (*pIter)->pBlockLoadInfo = pBlockLoadInfo; + if (taosArrayGetSize(pBlockLoadInfo->aSttBlk) == 0) { + code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk); + if (code) { + goto _exit; + } } - size_t size = taosArrayGetSize((*pIter)->aSttBlk); + size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); // find the start block int32_t index = -1; if (!backward) { // asc for (int32_t i = 0; i < size; ++i) { - SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); if (p->suid != suid) { continue; } @@ -94,7 +162,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t } } else { // desc for (int32_t i = size - 1; i >= 0; --i) { - SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); if (p->suid != suid) { continue; } @@ -108,7 +176,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t (*pIter)->iSttBlk = index; if (index != -1) { - (*pIter)->pSttBlk = taosArrayGet((*pIter)->aSttBlk, (*pIter)->iSttBlk); + (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); + (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1; } _exit: @@ -116,9 +185,6 @@ _exit: } void tLDataIterClose(SLDataIter *pIter) { - tBlockDataDestroy(&pIter->bData[0], 1); - tBlockDataDestroy(&pIter->bData[1], 1); - taosArrayDestroy(pIter->aSttBlk); taosMemoryFree(pIter); } @@ -127,9 +193,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) { pIter->iSttBlk += step; int32_t index = -1; - size_t size = taosArrayGetSize(pIter->aSttBlk); + size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { - SSttBlk *p = taosArrayGet(pIter->aSttBlk, i); + SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i); if ((!pIter->backward) && p->minUid > pIter->uid) { break; } @@ -169,7 +235,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { if (index == -1) { pIter->pSttBlk = NULL; } else { - pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); + pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); } } @@ -178,7 +244,8 @@ static void findNextValidRow(SLDataIter *pIter) { bool hasVal = false; int32_t i = pIter->iRow; - SBlockData *pBlockData = getCurrentBlock(pIter); + + SBlockData *pBlockData = loadBlockIfMissing(pIter); for (; i < pBlockData->nRow && i >= 0; i += step) { if (pBlockData->aUid != NULL) { @@ -238,19 +305,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) { return false; } - int32_t iBlockL = pIter->iSttBlk; - SBlockData *pBlockData = getCurrentBlock(pIter); - - if (pBlockData->nRow == 0 && pIter->pSttBlk != NULL) { // current block not loaded yet - pBlockData = getNextBlock(pIter); - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } - - pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1; - } - + int32_t iBlockL = pIter->iSttBlk; + SBlockData *pBlockData = loadBlockIfMissing(pIter); pIter->iRow += step; while (1) { @@ -266,12 +322,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) { } if (iBlockL != pIter->iSttBlk) { - pBlockData = getNextBlock(pIter); - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); - if (code) { - goto _exit; - } - pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0; + pBlockData = loadBlockIfMissing(pIter); + pIter->iRow += step; } } @@ -313,7 +365,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { } int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange) { + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo) { pMTree->backward = backward; pMTree->pIter = NULL; pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); @@ -322,21 +374,33 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead } tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); - int32_t code = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = TSDB_CODE_SUCCESS; + + SSttBlockLoadInfo* pLoadInfo = NULL; + if (pBlockLoadInfo == NULL) { + if (pMTree->pLoadInfo == NULL) { + pMTree->destroyLoadInfo = true; + pMTree->pLoadInfo = tCreateLastBlockLoadInfo(); + } + + pLoadInfo = pMTree->pLoadInfo; + } else { + pLoadInfo = pBlockLoadInfo; + } - struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0}; for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file - code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange); + struct SLDataIter* pIter = NULL; + code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]); if (code != TSDB_CODE_SUCCESS) { goto _end; } - bool hasVal = tLDataIterNextRow(pIterList[i]); + bool hasVal = tLDataIterNextRow(pIter); if (hasVal) { - taosArrayPush(pMTree->pIterList, &pIterList[i]); - tMergeTreeAddIter(pMTree, pIterList[i]); + taosArrayPush(pMTree->pIterList, &pIter); + tMergeTreeAddIter(pMTree, pIter); } else { - tLDataIterClose(pIterList[i]); + tLDataIterClose(pIter); } } @@ -393,4 +457,9 @@ void tMergeTreeClose(SMergeTree *pMTree) { pMTree->pIterList = taosArrayDestroy(pMTree->pIterList); pMTree->pIter = NULL; + + if (pMTree->destroyLoadInfo) { + pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo); + pMTree->destroyLoadInfo = false; + } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 8a51fc73a6380975dccc212ba1bf3c7bdc782a81..facfdb9a628c11c9b35e1dfb9a529b564308e7bb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -17,8 +17,6 @@ #include "tsdb.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) -#define ALL_ROWS_CHECKED_INDEX (INT16_MIN) -#define INITIAL_ROW_INDEX_VAL (-1) typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -88,6 +86,7 @@ typedef struct SLastBlockReader { int32_t order; uint64_t uid; SMergeTree mergeTree; + SSttBlockLoadInfo* pInfo; } SLastBlockReader; typedef struct SFilesetIter { @@ -226,13 +225,14 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK return NULL; } - int32_t step = ASCENDING_TRAVERSE(pTsdbReader->order)? 1:-1; for (int32_t j = 0; j < numOfTables; ++j) { STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { - info.lastKey = pTsdbReader->window.skey - step; + int64_t skey = pTsdbReader->window.skey; + info.lastKey = (skey > INT64_MIN)? (skey - 1):skey; } else { - info.lastKey = pTsdbReader->window.ekey - step; + int64_t ekey = pTsdbReader->window.ekey; + info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey; } taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); @@ -319,8 +319,7 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap } // init file iterator -static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, - STsdbReader* pReader /*int32_t order, const char* idstr*/) { +static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) { size_t numOfFileset = taosArrayGetSize(aDFileSet); pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset; @@ -345,6 +344,14 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); + if (pLReader->pInfo == NULL) { + pLReader->pInfo = tCreateLastBlockLoadInfo(); + if (pLReader->pInfo == NULL) { + tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); + return terrno; + } + } + tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -360,6 +367,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); + resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo); // check file the time range of coverage STimeWindow win = {0}; @@ -1377,7 +1385,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - // SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; @@ -1866,36 +1873,35 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc } } -static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader) { +static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. - if (pLastBlockReader->uid == pBlockScanInfo->uid) { + if (pLBlockReader->uid == pScanInfo->uid) { return true; } - if (pLastBlockReader->uid != 0) { - tMergeTreeClose(&pLastBlockReader->mergeTree); + if (pLBlockReader->uid != 0) { + tMergeTreeClose(&pLBlockReader->mergeTree); } - initMemDataIterator(pBlockScanInfo, pReader); - pLastBlockReader->uid = pBlockScanInfo->uid; + initMemDataIterator(pScanInfo, pReader); + pLBlockReader->uid = pScanInfo->uid; - int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1; - STimeWindow w = pLastBlockReader->window; - if (ASCENDING_TRAVERSE(pLastBlockReader->order)) { - w.skey = pBlockScanInfo->lastKey + step; + int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1; + STimeWindow w = pLBlockReader->window; + if (ASCENDING_TRAVERSE(pLBlockReader->order)) { + w.skey = pScanInfo->lastKey + step; } else { - w.ekey = pBlockScanInfo->lastKey + step; + w.ekey = pScanInfo->lastKey + step; } int32_t code = - tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, - pReader->suid, pBlockScanInfo->uid, &w, &pLastBlockReader->verRange); + tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo); if (code != TSDB_CODE_SUCCESS) { return false; } - return nextRowFromLastBlocks(pLastBlockReader, pBlockScanInfo); + return nextRowFromLastBlocks(pLBlockReader, pScanInfo); } static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { @@ -3305,6 +3311,7 @@ void tsdbReaderClose(STsdbReader* pReader) { SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree); + pFilesetIter->pLastBlockReader->pInfo = destroyLastBlockLoadInfo(pFilesetIter->pLastBlockReader->pInfo); taosMemoryFree(pFilesetIter->pLastBlockReader); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index dcf58759b01755f545f5b70f1b811218995fd2c2..f396f6e60948a3a669fc20ff2f453a692faa409e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2795,6 +2795,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); + // save selectivity value for column consisted of all null values + firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); return 0; } @@ -2871,7 +2873,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } } #endif - + if (numOfElems == 0) { + // save selectivity value for column consisted of all null values + firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + } SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -2892,6 +2897,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); + // save selectivity value for column consisted of all null values + firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); return 0; } @@ -2952,7 +2959,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #endif - + if (numOfElems == 0) { + // save selectivity value for column consisted of all null values + firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + } SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 5d894783fbada48446406db1d60da8121d1a690d..3f472b53a026de186bbab2e70d37997b2b2fda44 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1805,6 +1805,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { if (session->udfUvPipe == NULL) { fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName); + taosMemoryFree(session); return TSDB_CODE_UDF_PIPE_NO_PIPE; } @@ -1823,6 +1824,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); + //TODO: synchronization refactor between libuv event loop and request thread if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) { SClientUvConn *conn = session->udfUvPipe->data; conn->session = NULL; diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 691eb6771cfb0de12fe0328129dfa153b3eece8c..ecf91360734e37f0060aeb7758e5c4c5d57d4972 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -81,28 +81,28 @@ __compar_fn_t idxGetCompar(int8_t type) { } return getComparFunc(type, 0); } -static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { +static FORCE_INLINE TExeCond tCompareLessThan(void* a, void* b, int8_t type) { __compar_fn_t func = idxGetCompar(type); return tCompare(func, QUERY_LESS_THAN, a, b, type); } -static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { +static FORCE_INLINE TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { __compar_fn_t func = idxGetCompar(type); return tCompare(func, QUERY_LESS_EQUAL, a, b, type); } -static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { +static FORCE_INLINE TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { __compar_fn_t func = idxGetCompar(type); return tCompare(func, QUERY_GREATER_THAN, a, b, type); } -static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { +static FORCE_INLINE TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { __compar_fn_t func = idxGetCompar(type); return tCompare(func, QUERY_GREATER_EQUAL, a, b, type); } -static TExeCond tCompareContains(void* a, void* b, int8_t type) { +static FORCE_INLINE TExeCond tCompareContains(void* a, void* b, int8_t type) { __compar_fn_t func = idxGetCompar(type); return tCompare(func, QUERY_TERM, a, b, type); } -static TExeCond tCompareEqual(void* a, void* b, int8_t type) { +static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) { __compar_fn_t func = idxGetCompar(type); return tCompare(func, QUERY_TERM, a, b, type); } diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index e7f221de3d16ed0186f630ebdfe412f77d440dc3..b65acc467215da77019235e5ec44a335b363e344 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -88,7 +88,7 @@ typedef struct SIFCtx { SIndexMetaArg arg; } SIFCtx; -static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { +static FORCE_INLINE int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { if (src == OP_TYPE_GREATER_THAN) { *dst = QUERY_GREATER_THAN; } else if (src == OP_TYPE_GREATER_EQUAL) { @@ -110,10 +110,9 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { } typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output); - static sif_func_t sifNullFunc = NULL; -static void sifFreeParam(SIFParam *param) { +static FORCE_INLINE void sifFreeParam(SIFParam *param) { if (param == NULL) return; taosArrayDestroy(param->result); @@ -123,7 +122,7 @@ static void sifFreeParam(SIFParam *param) { param->pFilter = NULL; } -static int32_t sifGetOperParamNum(EOperatorType ty) { +static FORCE_INLINE int32_t sifGetOperParamNum(EOperatorType ty) { if (OP_TYPE_IS_NULL == ty || OP_TYPE_IS_NOT_NULL == ty || OP_TYPE_IS_TRUE == ty || OP_TYPE_IS_NOT_TRUE == ty || OP_TYPE_IS_FALSE == ty || OP_TYPE_IS_NOT_FALSE == ty || OP_TYPE_IS_UNKNOWN == ty || OP_TYPE_IS_NOT_UNKNOWN == ty || OP_TYPE_MINUS == ty) { @@ -131,14 +130,14 @@ static int32_t sifGetOperParamNum(EOperatorType ty) { } return 2; } -static int32_t sifValidOp(EOperatorType ty) { +static FORCE_INLINE int32_t sifValidOp(EOperatorType ty) { if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) || (ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) { return -1; } return 0; } -static int32_t sifValidColumn(SColumnNode *cn) { +static FORCE_INLINE int32_t sifValidColumn(SColumnNode *cn) { // add more check if (cn == NULL) { return TSDB_CODE_QRY_INVALID_INPUT; @@ -149,7 +148,7 @@ static int32_t sifValidColumn(SColumnNode *cn) { return TSDB_CODE_SUCCESS; } -static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SIdxFltStatus rs) { +static FORCE_INLINE SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SIdxFltStatus rs) { // enh rule later if (type == LOGIC_COND_TYPE_AND) { if (ls == SFLT_NOT_INDEX || rs == SFLT_NOT_INDEX) { @@ -167,7 +166,7 @@ static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SI return SFLT_NOT_INDEX; } -static int32_t sifGetValueFromNode(SNode *node, char **value) { +static FORCE_INLINE int32_t sifGetValueFromNode(SNode *node, char **value) { // covert data From snode; SValueNode *vn = (SValueNode *)node; @@ -205,7 +204,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) { return TSDB_CODE_SUCCESS; } -static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) { +static FORCE_INLINE int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) { SOperatorNode *nd = (SOperatorNode *)node; assert(nodeType(node) == QUERY_NODE_OPERATOR); SColumnNode *l = (SColumnNode *)nd->pLeft; @@ -355,30 +354,30 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu return TSDB_CODE_QRY_INVALID_INPUT; } -typedef int (*Filter)(void *a, void *b, int16_t dtype); +typedef int (*FilterFunc)(void *a, void *b, int16_t dtype); -int sifGreaterThan(void *a, void *b, int16_t dtype) { +static FORCE_INLINE int sifGreaterThan(void *a, void *b, int16_t dtype) { __compar_fn_t func = getComparFunc(dtype, 0); return tDoCompare(func, QUERY_GREATER_THAN, a, b); } -int sifGreaterEqual(void *a, void *b, int16_t dtype) { +static FORCE_INLINE int sifGreaterEqual(void *a, void *b, int16_t dtype) { __compar_fn_t func = getComparFunc(dtype, 0); return tDoCompare(func, QUERY_GREATER_EQUAL, a, b); } -int sifLessEqual(void *a, void *b, int16_t dtype) { +static FORCE_INLINE int sifLessEqual(void *a, void *b, int16_t dtype) { __compar_fn_t func = getComparFunc(dtype, 0); return tDoCompare(func, QUERY_LESS_EQUAL, a, b); } -int sifLessThan(void *a, void *b, int16_t dtype) { +static FORCE_INLINE int sifLessThan(void *a, void *b, int16_t dtype) { __compar_fn_t func = getComparFunc(dtype, 0); return (int)tDoCompare(func, QUERY_LESS_THAN, a, b); } -int sifEqual(void *a, void *b, int16_t dtype) { +static FORCE_INLINE int sifEqual(void *a, void *b, int16_t dtype) { __compar_fn_t func = getComparFunc(dtype, 0); //__compar_fn_t func = idxGetCompar(dtype); return (int)tDoCompare(func, QUERY_TERM, a, b); } -static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) { +static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reverse) { if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) { *reverse = true; } else { @@ -470,8 +469,8 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP indexMultiTermQueryAdd(mtm, tm, qtype); ret = indexJsonSearch(arg->ivtIdx, mtm, output->result); } else { - bool reverse; - Filter filterFunc = sifGetFilterFunc(qtype, &reverse); + bool reverse; + FilterFunc filterFunc = sifGetFilterFunc(qtype, &reverse); SMetaFltParam param = {.suid = arg->suid, .cid = left->colId, @@ -498,72 +497,72 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP return ret; } -static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_LOWER_THAN; return sifDoIndex(left, right, id, output); } -static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_LOWER_EQUAL; return sifDoIndex(left, right, id, output); } -static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_GREATER_THAN; return sifDoIndex(left, right, id, output); } -static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_GREATER_EQUAL; return sifDoIndex(left, right, id, output); } -static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_EQUAL; return sifDoIndex(left, right, id, output); } -static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NOT_EQUAL; return sifDoIndex(left, right, id, output); } -static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_IN; return sifDoIndex(left, right, id, output); } -static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NOT_IN; return sifDoIndex(left, right, id, output); } -static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_LIKE; return sifDoIndex(left, right, id, output); } -static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NOT_LIKE; return sifDoIndex(left, right, id, output); } -static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_MATCH; return sifDoIndex(left, right, id, output); } -static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_NMATCH; return sifDoIndex(left, right, id, output); } -static int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) { int id = OP_TYPE_JSON_CONTAINS; return sifDoIndex(left, right, id, output); } -static int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) { +static FORCE_INLINE int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) { // return 0 return 0; } -static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) { +static FORCE_INLINE int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) { // add more except return TSDB_CODE_QRY_INVALID_INPUT; } -static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *status) { +static FORCE_INLINE int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *status) { // impl later *status = SFLT_ACCURATE_INDEX; switch (funcId) { diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index f90c39278888c7838b0c4b1b5b434e4c06fb30a0..bec07b935e0f4fabea89664999604abff5b78c3c 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -502,7 +502,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { i64 offset; int ret; - offset = pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1); + offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1); if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { ASSERT(0); return -1; diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 275e7b42cc10ed2a77934da84ed544a1874e0b28..1559c85e23d59fec376890433f924522df8dc761 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -126,22 +126,22 @@ _OVER: return code; } -static void destroyHttpClient(SHttpClient* cli) { +static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { taosMemoryFree(cli->wbuf); taosMemoryFree(cli->rbuf); taosMemoryFree(cli->addr); taosMemoryFree(cli); } -static void clientCloseCb(uv_handle_t* handle) { +static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) { SHttpClient* cli = handle->data; destroyHttpClient(cli); } -static void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SHttpClient* cli = handle->data; buf->base = cli->rbuf; buf->len = HTTP_RECV_BUF_SIZE; } -static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { +static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { SHttpClient* cli = handle->data; if (nread < 0) { uError("http-report recv error:%s", uv_err_name(nread)); @@ -173,7 +173,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); } -static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { +static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4b5441f738d303fd8286e4cf856d1feacfee3eb5..dc2d937c49b7d8bbbcda9c4ca49571649d783834 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -69,11 +69,9 @@ typedef struct SCliThrd { SAsyncPool* asyncPool; uv_prepare_t* prepare; void* pool; // conn pool - + // timer handles SArray* timerList; - // msg queue - queue msg; TdThreadMutex msgMtx; SDelayQueue* delayQueue; @@ -108,7 +106,7 @@ static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn // static void cliTimeoutCb(uv_timer_t* handle); // alloc buffer for recv -static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // callback after recv nbytes from socket static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after send data to socket @@ -132,10 +130,10 @@ static void cliSend(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); // cli util func -static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); -static void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); +static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); +static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); -static int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); +static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); // process data read from server, add decompress etc later static void cliHandleResp(SCliConn* conn); @@ -150,12 +148,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate}; -static void cliSendQuit(SCliThrd* thrd); -static void destroyUserdata(STransMsg* userdata); - -static int cliRBChoseIdx(STrans* pTransInst); +static FORCE_INLINE void destroyUserdata(STransMsg* userdata); +static FORCE_INLINE void destroyCmsg(void* cmsg); +static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); -static void destroyCmsg(void* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); // thread obj static SCliThrd* createThrdObj(); @@ -434,6 +430,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (pCtx == NULL || pCtx->pSem == NULL) { if (transMsg.info.ahandle == NULL) { + if (REQUEST_NO_RESP(&pMsg->msg)) destroyCmsg(pMsg); once = true; continue; } @@ -885,26 +882,23 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { } return conn; } -void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { +FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { if (pCvtAddr->cvt == false) { return; } - for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) { - if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { - memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN); - memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); - } + if (pEpSet->numOfEps == 1 && strncmp(pEpSet->eps[0].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { + memset(pEpSet->eps[0].fqdn, 0, TSDB_FQDN_LEN); + memcpy(pEpSet->eps[0].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); } } -bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { +FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { if (code != 0) return false; if (pCtx->retryCnt == 0) return false; if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false; return true; } - -int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { +FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { if (pMsg == NULL) return -1; memset(pResp, 0, sizeof(STransMsg)); @@ -980,6 +974,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } } + STraceId* trace = &pMsg->msg.info.traceId; + tGTrace("%s conn %p ready", pTransInst->label, conn); } static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; @@ -1128,14 +1124,15 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, return cli; } -static void destroyUserdata(STransMsg* userdata) { +FORCE_INLINE void destroyUserdata(STransMsg* userdata) { if (userdata->pCont == NULL) { return; } transFreeMsg(userdata->pCont); userdata->pCont = NULL; } -static void destroyCmsg(void* arg) { + +FORCE_INLINE void destroyCmsg(void* arg) { SCliMsg* pMsg = arg; if (pMsg == NULL) { return; @@ -1220,7 +1217,7 @@ void cliWalkCb(uv_handle_t* handle, void* arg) { } } -int cliRBChoseIdx(STrans* pTransInst) { +FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) { int8_t index = pTransInst->index; if (pTransInst->numOfThreads == 0) { return -1; @@ -1230,7 +1227,7 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } -static void doDelayTask(void* param) { +static FORCE_INLINE void doDelayTask(void* param) { STaskArg* arg = param; SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; @@ -1264,13 +1261,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); } -void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { +FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { if (*val != exp) { *val = newVal; } } -bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { +FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; } @@ -1300,7 +1297,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STrans* pTransInst = pThrd->pTransInst; if (pMsg == NULL || pMsg->ctx == NULL) { - tTrace("%s conn %p handle resp", pTransInst->label, pConn); + tDebug("%s conn %p handle resp", pTransInst->label, pConn); pTransInst->cfp(pTransInst->parent, pResp, NULL); return 0; } @@ -1402,7 +1399,7 @@ void transUnrefCliHandle(void* handle) { cliDestroyConn((SCliConn*)handle, true); } } -SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) { +static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) { SCliThrd* pThrd = NULL; SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh == NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index dea96fa3acf106f2956435dbff9b3511a8db1219..5f3171ee0e840ee7f558a13b4ad315bd9bcfb856 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -424,7 +424,7 @@ void transQueueDestroy(STransQueue* queue) { taosArrayDestroy(queue->q); } -static int32_t timeCompare(const HeapNode* a, const HeapNode* b) { +static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) { SDelayTask* arg1 = container_of(a, SDelayTask, node); SDelayTask* arg2 = container_of(b, SDelayTask, node); if (arg1->execTime > arg2->execTime) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 7007079f8760c944c70bbfe0798fb0fc518cf958..c00cafb708298a5761994fa2143d555870b15dd5 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -125,17 +125,17 @@ static void uvWorkAfterTask(uv_work_t* req, int status); static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvFreeCb(uv_handle_t* handle); -static void uvStartSendRespImpl(SSvrMsg* smsg); +static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg); + static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSvrMsg* msg); static void uvNotifyLinkBrokenToApp(SSvrConn* conn); -static void destroySmsg(SSvrMsg* smsg); -// check whether already read complete packet -static SSvrConn* createConn(void* hThrd); -static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); -static void destroyConnRegArg(SSvrConn* conn); +static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); +static FORCE_INLINE SSvrConn* createConn(void* hThrd); +static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); +static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); static int reallocConnRef(SSvrConn* conn); @@ -413,7 +413,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { wb->len = len; } -static void uvStartSendRespImpl(SSvrMsg* smsg) { +static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { SSvrConn* pConn = smsg->pConn; if (pConn->broken) { return; @@ -447,7 +447,7 @@ static void uvStartSendResp(SSvrMsg* smsg) { return; } -static void destroySmsg(SSvrMsg* smsg) { +static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { if (smsg == NULL) { return; } @@ -812,7 +812,7 @@ void* transWorkerThread(void* arg) { return NULL; } -static SSvrConn* createConn(void* hThrd) { +static FORCE_INLINE SSvrConn* createConn(void* hThrd) { SWorkThrd* pThrd = hThrd; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); @@ -842,7 +842,7 @@ static SSvrConn* createConn(void* hThrd) { return pConn; } -static void destroyConn(SSvrConn* conn, bool clear) { +static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) { if (conn == NULL) { return; } @@ -854,7 +854,7 @@ static void destroyConn(SSvrConn* conn, bool clear) { } } } -static void destroyConnRegArg(SSvrConn* conn) { +static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { if (conn->regArg.init == 1) { transFreeMsg(conn->regArg.msg.pCont); conn->regArg.init = 0; diff --git a/tests/script/tsim/parser/commit.sim b/tests/script/tsim/parser/commit.sim index ae19a4803bc6bd0f75c2a307696d7b0fc6f1ecb6..a9bf8b26ebda178ed789f3119eac7203a6889f9b 100644 --- a/tests/script/tsim/parser/commit.sim +++ b/tests/script/tsim/parser/commit.sim @@ -97,6 +97,7 @@ while $loop <= $loops endw sql select count(*) from $stb if $data00 != $totalNum then + print expect $totalNum , actual: $data00 return -1 endi $loop = $loop + 1