diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 4da2d80fbb21f0b1d132a52d86c29bf3b8c09cd3..37dd448b87e398f992b6dd6cd22815796da2a957 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG c4a567b + GIT_TAG 6a2d9fc SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/examples/go/go.mod b/docs/examples/go/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..716a0ef5dc91b4d3598c8af304204edb99e9b584 --- /dev/null +++ b/docs/examples/go/go.mod @@ -0,0 +1,6 @@ +module goexample + +go 1.17 + +require github.com/taosdata/driver-go/v3 v3.1.0 + diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 9ca6a7a9fafe464f1f82682b9e3599d15b6dff79..411c7d5e8b22640f4fd67651b3de8f7a86bb171e 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -218,6 +218,7 @@ bool fmIsKeepOrderFunc(int32_t funcId); bool fmIsCumulativeFunc(int32_t funcId); bool fmIsInterpPseudoColumnFunc(int32_t funcId); bool fmIsGroupKeyFunc(int32_t funcId); +bool fmIsBlockDistFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b5b99e92b07b155ab69a6d41f3d793ed697dab7f..07046eb04108f86e6dbb46ac053f35210a5d4c11 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1402,8 +1402,6 @@ int32_t doProcessMsgFromServer(void* param) { tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); } - - taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } } @@ -1423,6 +1421,11 @@ int32_t doProcessMsgFromServer(void* param) { } pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); + + if (pTscObj) { + taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); + } + rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 16fe6c1b91f8a7788ad4a48993b337fe6bc7b160..cf4eaaf7d1b205d23360e93f444fd2b7495cc122 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -167,6 +167,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index d504a94700096162d636390ee6eb5e2d2f77f139..471c53b2f536cb45f52219096c728aae4f13fc01 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -41,6 +41,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq); static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReq(SRpcMsg *pReq); +static int32_t mndProcessDropTtltbReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); @@ -64,6 +65,7 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbReq); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); @@ -2176,6 +2178,10 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, return 0; } +static int32_t mndProcessDropTtltbReq(SRpcMsg *pRsp) { + return 0; +} + static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6b675586e47a9116c9043991c17fa58984d0a44f..d458ffaed459ebcb01c3f61b70515b38288e573b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -85,7 +85,11 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta pRaw, pMgmt->transSec, pMgmt->transSeq); if (pMeta->code == 0) { - sdbWriteWithoutFree(pMnode->pSdb, pRaw); + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); + if (code != 0) { + mError("trans:%d, failed to write to sdb since %s", transId, terrstr()); + return 0; + } sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index dfcd55bcba7ca6a46a79cac2351df18a135218db..0b28a6eb43b31d8df8bb80e4c9d5e4dfa0919413 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -572,8 +572,20 @@ static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) { } static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { - mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, - mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); + mTrace("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64, + pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), + pNew->createdTime); + + if (pOld->createdTime != pNew->createdTime) { + mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64 + ", new row:%p stage:%s create:%" PRId64, + pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), + pNew->createdTime); + // only occured while sync timeout + terrno = TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT; + return -1; + } + mndTransUpdateActions(pOld->redoActions, pNew->redoActions); mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2efb00ae32c510211a0ac0d6c44450fa1b48464f..ec66e581efa3cba7f38dcc675ac71946172c1694 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -321,6 +321,8 @@ struct STsdb { STsdbFS fs; SLRUCache *lruCache; TdThreadMutex lruMutex; + SLRUCache *biCache; + TdThreadMutex biMutex; }; struct TSDBKEY { @@ -746,6 +748,9 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); +int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle); +int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); + int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 0fc5b617bbb83e514ff8e00295d68295f70d1bb1..fb2efda8e4cf46febc2880da7c13e37fe61e0c15 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -15,6 +15,34 @@ #include "tsdb.h" +static int32_t tsdbOpenBICache(STsdb *pTsdb) { + int32_t code = 0; + SLRUCache *pCache = taosLRUCacheInit(5 * 1024 * 1024, -1, .5); + if (pCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + taosLRUCacheSetStrictCapacity(pCache, false); + + taosThreadMutexInit(&pTsdb->biMutex, NULL); + +_err: + pTsdb->biCache = pCache; + return code; +} + +static void tsdbCloseBICache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->biCache; + if (pCache) { + taosLRUCacheEraseUnrefEntries(pCache); + + taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->biMutex); + } +} + int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = NULL; @@ -26,6 +54,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenBICache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + taosLRUCacheSetStrictCapacity(pCache, false); taosThreadMutexInit(&pTsdb->lruMutex, NULL); @@ -44,6 +78,8 @@ void tsdbCloseCache(STsdb *pTsdb) { taosThreadMutexDestroy(&pTsdb->lruMutex); } + + tsdbCloseBICache(pTsdb); } static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { @@ -1475,3 +1511,84 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) { return usage; } + +static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) { + struct { + int32_t fid; + int64_t commitID; + } biKey = {0}; + + biKey.fid = fid; + biKey.commitID = commitID; + + *len = sizeof(biKey); + memcpy(key, &biKey, *len); +} + +static int32_t tsdbCacheLoadBlockIdx(SDataFReader *pFileReader, SArray **aBlockIdx) { + SArray *pArray = taosArrayInit(8, sizeof(SBlockIdx)); + int32_t code = tsdbReadBlockIdx(pFileReader, pArray); + + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pArray); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + *aBlockIdx = pArray; + + return code; +} + +static void deleteBICache(const void *key, size_t keyLen, void *value) { + SArray *pArray = (SArray *)value; + + taosArrayDestroy(pArray); +} + +int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle) { + int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; + + getBICacheKey(pFileReader->pSet->fid, pFileReader->pSet->pHeadF->commitID, key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + STsdb *pTsdb = pFileReader->pTsdb; + taosThreadMutexLock(&pTsdb->biMutex); + + h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + SArray *pArray = NULL; + code = tsdbCacheLoadBlockIdx(pFileReader, &pArray); + // if table's empty or error, return code of -1 + if (code != TSDB_CODE_SUCCESS || pArray == NULL) { + taosThreadMutexUnlock(&pTsdb->biMutex); + + *handle = NULL; + return 0; + } + + size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray); + _taos_lru_deleter_t deleter = deleteBICache; + LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + } + + taosThreadMutexUnlock(&pTsdb->biMutex); + } + + *handle = h; + + return code; +} + +int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 6532a997462ff5e28ddf56f65c57350ad6b5e35f..aa5d8fa3a4395b49b47ff233697f914d37cac980 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -79,16 +79,19 @@ typedef struct SIOCostSummary { int64_t composedBlocks; double buildComposedBlockTime; double createScanInfoList; + double getTbFromMemTime; + double getTbFromIMemTime; + double initDelSkylineIterTime; } SIOCostSummary; typedef struct SBlockLoadSuppInfo { - SArray* pColAgg; - SColumnDataAgg tsColAgg; - int16_t* colId; - int16_t* slotId; - int32_t numOfCols; - char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. - bool smaValid; // the sma on all queried columns are activated + SArray* pColAgg; + SColumnDataAgg tsColAgg; + int16_t* colId; + int16_t* slotId; + int32_t numOfCols; + char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. + bool smaValid; // the sma on all queried columns are activated } SBlockLoadSuppInfo; typedef struct SLastBlockReader { @@ -168,11 +171,11 @@ struct STsdbReader { SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; SIOCostSummary cost; - STSchema* pSchema; // the newest version schema - STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times - SDataFReader* pFileReader; // the file reader - SDelFReader* pDelFReader; // the del file reader - SArray* pDelIdx; // del file block index; + STSchema* pSchema; // the newest version schema + STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times + SDataFReader* pFileReader; // the file reader + SDelFReader* pDelFReader; // the del file reader + SArray* pDelIdx; // del file block index; SVersionRange verRange; SBlockInfoBuf blockInfoBuf; int32_t step; @@ -219,17 +222,18 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { +static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, + int32_t numOfCols) { pSupInfo->smaValid = true; pSupInfo->numOfCols = numOfCols; - pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t)*2 + POINTER_BYTES)); + pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES)); if (pSupInfo->colId == NULL) { taosMemoryFree(pSupInfo->colId); return TSDB_CODE_OUT_OF_MEMORY; } pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols)); - pSupInfo->buildBuf = (char**) ((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols)); + pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols)); for (int32_t i = 0; i < numOfCols; ++i) { pSupInfo->colId[i] = pCols[i].colId; pSupInfo->slotId[i] = pSlotIdList[i]; @@ -247,7 +251,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) { int32_t i = 0, j = 0; - while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) { + while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) { STColumn* pTCol = &pSchema->columns[i]; if (pTCol->colId == pSupInfo->colId[j]) { if (!IS_BSMA_ON(pTCol)) { @@ -312,7 +316,8 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { } // NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model -static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) { +static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, + int32_t numOfTables) { // allocate buffer in order to load data blocks from file // todo use simple hash instead, optimize the memory consumption SHashObj* pTableMap = @@ -398,9 +403,7 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) { taosHashCleanup(pTableMap); } -static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { - return pWindow->skey > pWindow->ekey; -} +static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; } // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // the expired data to client, even it is queried already. @@ -644,17 +647,21 @@ _end: } static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { - SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx)); + // SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx)); int64_t st = taosGetTimestampUs(); - int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx); - if (code != TSDB_CODE_SUCCESS) { + // int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx); + LRUHandle* handle = NULL; + int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle); + if (code != TSDB_CODE_SUCCESS || handle == NULL) { goto _end; } - size_t num = taosArrayGetSize(aBlockIdx); + SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle); + size_t num = taosArrayGetSize(aBlockIdx); if (num == 0) { - taosArrayDestroy(aBlockIdx); + tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); + // taosArrayDestroy(aBlockIdx); return TSDB_CODE_SUCCESS; } @@ -690,7 +697,8 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, pReader->cost.headFileLoadTime += (et1 - st) / 1000.0; _end: - taosArrayDestroy(aBlockIdx); + // taosArrayDestroy(aBlockIdx); + tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); return code; } @@ -769,7 +777,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el, pReader->idStr); - pReader->cost.numOfBlocks += total; pReader->cost.headFileLoadTime += el; @@ -903,7 +910,7 @@ static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* p // a faster version of copy procedure. static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, - int32_t dumpedRows, bool asc) { + int32_t dumpedRows, bool asc) { uint8_t* p = NULL; if (asc) { p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; @@ -912,22 +919,21 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo p = pData->pData + tDataTypes[pData->type].bytes * startIndex; } - int32_t step = asc? 1:-1; + int32_t step = asc ? 1 : -1; // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit -// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); + // ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // 1. copy data in a batch model memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); // 2. reverse the array list in case of descending order scan data block if (!asc) { - switch(pColData->info.type) { + switch (pColData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - { + case TSDB_DATA_TYPE_UBIGINT: { int32_t mid = dumpedRows >> 1u; int64_t* pts = (int64_t*)pColData->pData; for (int32_t j = 0; j < mid; ++j) { @@ -941,7 +947,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: { - int32_t mid = dumpedRows >> 1u; + int32_t mid = dumpedRows >> 1u; int8_t* pts = (int8_t*)pColData->pData; for (int32_t j = 0; j < mid; ++j) { int8_t t = pts[j]; @@ -1113,7 +1119,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s", + ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows, unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr); @@ -1766,7 +1772,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tRowMergerAdd(&merge, pRow, pSchema); } else { init = true; - int32_t code = tRowMergerInit(&merge, pRow, pSchema); + int32_t code = tRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2177,10 +2183,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea } int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); + int64_t st = 0; STbData* d = NULL; if (pReader->pReadSnap->pMem != NULL) { + st = taosGetTimestampUs(); d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); + pReader->cost.getTbFromMemTime += (taosGetTimestampUs() - st) / 1000.0; if (d != NULL) { code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -2201,7 +2210,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea STbData* di = NULL; if (pReader->pReadSnap->pIMem != NULL) { + st = taosGetTimestampUs(); di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid); + pReader->cost.getTbFromIMemTime += (taosGetTimestampUs() - st) / 1000.0; if (di != NULL) { code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -2220,7 +2231,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); } + st = taosGetTimestampUs(); initDelSkylineIterator(pBlockScanInfo, pReader, d, di); + pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0; pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; @@ -2296,7 +2309,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLas bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { - return false; // this is an invalid result. + return false; // this is an invalid result. } return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); } @@ -2447,7 +2460,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); // it is a clean block, load it directly - if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && + if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && pBlock->nRow <= pReader->capacity) { if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { copyBlockDataToSDataBlock(pReader, pBlockScanInfo); @@ -2611,7 +2624,7 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) hasIKey = true; ikey = TSDBROW_KEY(pIRow); } - + if (hasKey) { if (hasIKey) { // has data in mem & imem if (asc) { @@ -2666,7 +2679,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { taosArrayDestroy(pIndexList); if (pReader->pReadSnap != NULL) { - SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile; if (pReader->pDelFReader == NULL && pDelFile != NULL) { int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb); @@ -2854,7 +2866,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ASSERT(pBlockInfo != NULL); -// if (pBlockInfo != NULL) { + // if (pBlockInfo != NULL) { pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); // } else { // pScanInfo = *pReader->status.pTableIter; @@ -2866,9 +2878,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } -// if (pBlockInfo != NULL) { + // if (pBlockInfo != NULL) { pBlock = getCurrentBlock(pBlockIter); -// } + // } initLastBlockReader(pLastBlockReader, pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); @@ -2933,7 +2945,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // update the last key for the corresponding table pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " + tsdbDebug("%p uid:%" PRIu64 + " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts, pBlock->maxKey.ts, pReader->idStr); @@ -3188,7 +3201,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32 return false; } else if (pKey->ts == last->ts) { TSDBKEY* prev = taosArrayGet(pDelList, num - 2); - return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer); + return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && + prev->version >= pVerRange->minVer); } } else { TSDBKEY* pCurrent = taosArrayGet(pDelList, *index); @@ -3372,7 +3386,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - bool loadNeighbor = true; + bool loadNeighbor = true; int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor); if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) { @@ -3624,7 +3638,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* SColVal colVal = {0}; int32_t i = 0, j = 0; - if (pSupInfo->colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) { + if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]); ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts; i += 1; @@ -3669,7 +3683,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S int32_t outputRowIndex = pResBlock->info.rows; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - if (pReader->suppInfo.colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) { + if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]); ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex]; i += 1; @@ -4011,14 +4025,17 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pLReader); } - tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 - " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, " - "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64 - ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms, %s", - pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, - pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, - pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime, - numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr); + tsdbDebug( + "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 + " SMA-time:%.2f ms, fileBlocks:%" PRId64 + ", fileBlocks-load-time:%.2f ms, " + "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64 + ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms," + "getTbFromMem-time:%.2f ms, getTbFromIMem-time:%.2f ms, initDelSkylineIterTime:%.2f ms, %s", + pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks, + pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks, + pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, + pCost->getTbFromMemTime, pCost->getTbFromIMemTime, pCost->initDelSkylineIterTime, pReader->idStr); taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); @@ -4034,7 +4051,7 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) { blockDataCleanup(pBlock); SReaderStatus* pStatus = &pReader->status; - if (taosHashGetSize(pStatus->pTableMap) == 0){ + if (taosHashGetSize(pStatus->pTableMap) == 0) { return false; } @@ -4124,12 +4141,10 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64 } } - -static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, - SColumnDataAgg* pTsAgg) { +static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) { // do fill all null column value SMA info int32_t i = 0, j = 0; - int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg); + int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg); taosArrayInsert(pSup->pColAgg, 0, pTsAgg); while (j < numOfCols && i < size) { @@ -4142,7 +4157,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ } else if (pSup->colId[j] < pAgg->colId) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; - taosArrayInsert(pSup->pColAgg, i ,&nullColAgg); + taosArrayInsert(pSup->pColAgg, i, &nullColAgg); } j += 1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 98c9c0fddab29327cb239aaa632a972664faef84..e8181f922f0262a72f2f40bed319a17008738cbb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1205,7 +1205,7 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { TSDB_CHECK_CODE(code, lino, _exit); } - tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); + tMapDataPutItem(&pWriter->mDataBlk, &dataBlk, tPutDataBlk); pWriter->pDIter->dIter.iDataBlk++; } else { code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e2d4b92836d8f6c25cdd4eb5838553fdef167ba5..a252c6deb6f0547edc743a438310f4ab6622b39d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -929,8 +929,9 @@ int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) { return code; } +/* // delete skyline ====================================================== -static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) { +static int32_t tsdbMergeSkyline2(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) { int32_t code = 0; int32_t i1 = 0; int32_t n1 = taosArrayGetSize(aSkyline1); @@ -996,7 +997,141 @@ static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aS _exit: return code; } +*/ + +// delete skyline ====================================================== +static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) { + int32_t code = 0; + int32_t i1 = 0; + int32_t n1 = taosArrayGetSize(pSkyline1); + int32_t i2 = 0; + int32_t n2 = taosArrayGetSize(pSkyline2); + TSDBKEY *pKey1; + TSDBKEY *pKey2; + int64_t version1 = 0; + int64_t version2 = 0; + + ASSERT(n1 > 0 && n2 > 0); + + taosArrayClear(pSkyline); + TSDBKEY **pItem = TARRAY_GET_ELEM(pSkyline, 0); + + while (i1 < n1 && i2 < n2) { + pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1); + pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2); + + if (pKey1->ts < pKey2->ts) { + version1 = pKey1->version; + *pItem = pKey1; + i1++; + } else if (pKey1->ts > pKey2->ts) { + version2 = pKey2->version; + *pItem = pKey2; + i2++; + } else { + version1 = pKey1->version; + version2 = pKey2->version; + *pItem = pKey1; + i1++; + i2++; + } + + (*pItem)->version = TMAX(version1, version2); + pItem++; + } + + while (i1 < n1) { + pKey1 = (TSDBKEY *)taosArrayGetP(pSkyline1, i1); + *pItem = pKey1; + pItem++; + i1++; + } + + while (i2 < n2) { + pKey2 = (TSDBKEY *)taosArrayGetP(pSkyline2, i2); + *pItem = pKey2; + pItem++; + i2++; + } + + taosArraySetSize(pSkyline, TARRAY_ELEM_IDX(pSkyline, pItem)); + +_exit: + return code; +} + + +int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) { + int32_t code = 0; + SDelData *pDelData; + int32_t midx; + + taosArrayClear(pSkyline); + if (sidx == eidx) { + TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2); + TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1); + taosArrayPush(pSkyline, &pItem1); + taosArrayPush(pSkyline, &pItem2); + } else { + SArray *pSkyline1 = NULL; + SArray *pSkyline2 = NULL; + midx = (sidx + eidx) / 2; + + pSkyline1 = taosArrayInit((midx - sidx + 1) * 2, POINTER_BYTES); + pSkyline2 = taosArrayInit((eidx - midx) * 2, POINTER_BYTES); + if (pSkyline1 == NULL || pSkyline1 == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _clear; + } + + code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1); + if (code) goto _clear; + + code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2); + if (code) goto _clear; + + code = tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline); + + _clear: + taosArrayDestroy(pSkyline1); + taosArrayDestroy(pSkyline2); + } + + return code; +} + + int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) { + SDelData *pDelData; + int32_t code = 0; + int32_t dataNum = eidx - sidx + 1; + SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); + SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); + + for (int32_t i = sidx; i <= eidx; ++i) { + pDelData = (SDelData *)taosArrayGet(aDelData, i); + taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}); + taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}); + } + + code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline); + if (code) goto _clear; + + int32_t skylineNum = taosArrayGetSize(pSkyline); + for (int32_t i = 0; i < skylineNum; ++i) { + TSDBKEY *p = taosArrayGetP(pSkyline, i); + taosArrayPush(aSkyline, p); + } + +_clear: + taosArrayDestroy(aTmpSkyline); + taosArrayDestroy(pSkyline); + + return code; +} + +/* +int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) { int32_t code = 0; SDelData *pDelData; int32_t midx; @@ -1033,6 +1168,7 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr return code; } +*/ // SBlockData ====================================================== int32_t tBlockDataCreate(SBlockData *pBlockData) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 2d921d43d34726b64b0be3dde97a52a95cf19f61..826af20224cfe2bde625ac564942df7a54e9c605 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -447,32 +447,6 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t taosResetFillInfo(pFillInfo, skey); - switch (fillType) { - case FILL_MODE_NONE: - pFillInfo->type = TSDB_FILL_NONE; - break; - case FILL_MODE_PREV: - pFillInfo->type = TSDB_FILL_PREV; - break; - case FILL_MODE_NULL: - pFillInfo->type = TSDB_FILL_NULL; - break; - case FILL_MODE_LINEAR: - pFillInfo->type = TSDB_FILL_LINEAR; - break; - case FILL_MODE_NEXT: - pFillInfo->type = TSDB_FILL_NEXT; - break; - case FILL_MODE_VALUE: - pFillInfo->type = TSDB_FILL_SET_VALUE; - break; - default: { - taosMemoryFree(pFillInfo); - terrno = TSDB_CODE_INVALID_PARA; - return NULL; - } - } - pFillInfo->type = fillType; pFillInfo->pFillCol = pCol; pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols; diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 6ab91a4483eeaf0dabf5968c6f0cd424cb80fb96..e3127fcd7ba233d478c58cc16b7543587b5eaf90 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -262,6 +262,13 @@ bool fmIsGroupKeyFunc(int32_t funcId) { return FUNCTION_TYPE_GROUP_KEY == funcMgtBuiltins[funcId].type; } +bool fmIsBlockDistFunc(int32_t funcId) { + if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return false; + } + return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type; +} + void fmFuncMgtDestroy() { void* m = gFunMgtService.pFuncNameHashTable; if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) { diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 5cc72f86923762454c8cae66ef9bbc7828b9ef05..9dd4daec3b3905740aa8ee2d25bcbd7751279aec 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -22,11 +22,11 @@ struct SToken; #define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) -#define NEXT_TOKEN(pSql, sToken) \ - do { \ - int32_t index = 0; \ - sToken = tStrGetToken(pSql, &index, false); \ - pSql += index; \ +#define NEXT_TOKEN(pSql, sToken) \ + do { \ + int32_t index = 0; \ + sToken = tStrGetToken(pSql, &index, false, NULL); \ + pSql += index; \ } while (0) #define CHECK_CODE(expr) \ diff --git a/source/libs/parser/inc/parToken.h b/source/libs/parser/inc/parToken.h index fb4b46aa35094fd3ac65171c5499c038eb92d233..d539e8b37bfbab8cd0995f2f77c8228809b07f7e 100644 --- a/source/libs/parser/inc/parToken.h +++ b/source/libs/parser/inc/parToken.h @@ -55,7 +55,7 @@ uint32_t tGetToken(const char *z, uint32_t *tokenType); * @param isPrevOptr * @return */ -SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr); +SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr, bool *pIgnoreComma); /** * check if it is a keyword or not diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 1e1821842fb39139d961fae78b9dfad6f294a5d4..abee9416faf733d437a34f089f74ec1b1f20e7ed 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -18,16 +18,23 @@ #include "tglobal.h" #include "ttime.h" -#define NEXT_TOKEN_WITH_PREV(pSql, token) \ - do { \ - int32_t index = 0; \ - token = tStrGetToken(pSql, &index, true); \ - pSql += index; \ +#define NEXT_TOKEN_WITH_PREV(pSql, token) \ + do { \ + int32_t index = 0; \ + token = tStrGetToken(pSql, &index, true, NULL); \ + pSql += index; \ } while (0) -#define NEXT_TOKEN_KEEP_SQL(pSql, token, index) \ - do { \ - token = tStrGetToken(pSql, &index, false); \ +#define NEXT_TOKEN_WITH_PREV_EXT(pSql, token, pIgnoreComma) \ + do { \ + int32_t index = 0; \ + token = tStrGetToken(pSql, &index, true, pIgnoreComma); \ + pSql += index; \ + } while (0) + +#define NEXT_TOKEN_KEEP_SQL(pSql, token, index) \ + do { \ + token = tStrGetToken(pSql, &index, false, NULL); \ } while (0) #define NEXT_VALID_TOKEN(pSql, token) \ @@ -302,12 +309,12 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t * e.g., now+12a, now-5h */ index = 0; - SToken token = tStrGetToken(pTokenEnd, &index, false); + SToken token = tStrGetToken(pTokenEnd, &index, false, NULL); pTokenEnd += index; if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) { index = 0; - SToken valueToken = tStrGetToken(pTokenEnd, &index, false); + SToken valueToken = tStrGetToken(pTokenEnd, &index, false, NULL); pTokenEnd += index; if (valueToken.n < 2) { @@ -1240,30 +1247,35 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB int32_t code = tdSRowResetBuf(pBuilder, row); // 1. set the parsed value from sql string for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) { - NEXT_TOKEN_WITH_PREV(*pSql, *pToken); - SSchema* pSchema = &pSchemas[pCols->boundColumns[i]]; + const char* pOrigSql = *pSql; + bool ignoreComma = false; + NEXT_TOKEN_WITH_PREV_EXT(*pSql, *pToken, &ignoreComma); + if (ignoreComma) { + code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql); + break; + } if (pToken->type == TK_NK_QUESTION) { isParseBindParam = true; if (NULL == pCxt->pComCxt->pStmtCb) { code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z); + break; + } + } else { + if (TK_NK_RP == pToken->type) { + code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); + break; } - continue; - } - - if (TSDB_CODE_SUCCESS == code && TK_NK_RP == pToken->type) { - code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); - } - if (TSDB_CODE_SUCCESS == code && isParseBindParam) { - code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); - } + if (isParseBindParam) { + code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); + break; + } - if (TSDB_CODE_SUCCESS == code) { - param.schema = pSchema; + param.schema = &pSchemas[pCols->boundColumns[i]]; insGetSTSRowAppendInfo(pBuilder->rowType, pCols, i, ¶m.toffset, ¶m.colIdx); - code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pDataBuf->pTableMeta).precision, insMemRowAppend, - ¶m); + code = parseValueToken(pCxt, pSql, pToken, param.schema, getTableInfo(pDataBuf->pTableMeta).precision, + insMemRowAppend, ¶m); } if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) { diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index e62b2f0f5ab2db843d41009bb21e68a60f19cd67..6a96ff9c504c957b86d91ea96075a98da76e4478 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -620,7 +620,7 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) { return 0; } -SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) { +SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreComma) { SToken t0 = {0}; // here we reach the end of sql string, null-terminated string @@ -641,6 +641,10 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) { return t0; } + if (NULL != pIgnoreComma && t == ',') { + *pIgnoreComma = true; + } + t = str[++(*i)]; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 183d1b0c6ef0300bcad5045c4822191077e8fc90..ec834a6ba1435f036f84940a7fe6a9fbbbdee84e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1561,6 +1561,26 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p return TSDB_CODE_SUCCESS; } +static int32_t translateBlockDistFunc(STranslateContext* pCtx, SFunctionNode* pFunc) { + if (!fmIsBlockDistFunc(pFunc->funcId)) { + return TSDB_CODE_SUCCESS; + } + if (!isSelectStmt(pCtx->pCurrStmt)) { + return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, + "%s is only supported in single table query", pFunc->functionName); + } + SSelectStmt* pSelect = (SSelectStmt*)pCtx->pCurrStmt; + SNode* pTable = pSelect->pFromTable; + if (NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) || + (TSDB_SUPER_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && + TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && + TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) { + return generateSyntaxErrMsgExt(&pCtx->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, + "%s is only supported on super table, child table or normal table", pFunc->functionName); + } + return TSDB_CODE_SUCCESS; +} + static bool isStar(SNode* pNode) { return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*")); @@ -1720,7 +1740,7 @@ static int32_t rewriteSystemInfoFunc(STranslateContext* pCxt, SNode** pNode) { return TSDB_CODE_PAR_INTERNAL_ERROR; } -static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { +static int32_t translateNormalFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { int32_t code = translateAggFunc(pCxt, pFunc); if (TSDB_CODE_SUCCESS == code) { code = translateScanPseudoColumnFunc(pCxt, pFunc); @@ -1752,6 +1772,9 @@ static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* p if (TSDB_CODE_SUCCESS == code) { code = translateTimelineFunc(pCxt, pFunc); } + if (TSDB_CODE_SUCCESS == code) { + code = translateBlockDistFunc(pCxt, pFunc); + } if (TSDB_CODE_SUCCESS == code) { setFuncClassification(pCxt->pCurrStmt, pFunc); } @@ -1812,7 +1835,7 @@ static int32_t translateFunctionImpl(STranslateContext* pCxt, SFunctionNode** pF if (fmIsClientPseudoColumnFunc((*pFunc)->funcId)) { return rewriteClientPseudoColumnFunc(pCxt, (SNode**)pFunc); } - return translateNoramlFunction(pCxt, *pFunc); + return translateNormalFunction(pCxt, *pFunc); } static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index cf338d63ff43182330d2f1e229aa2a03070feb6a..47482db7400f98c839c112d93e503b20d25f8073 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -27,7 +27,7 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) { const char* pSql = pStr; int32_t index = 0; - SToken t = tStrGetToken((char*)pStr, &index, false); + SToken t = tStrGetToken((char*)pStr, &index, false, NULL); if (TK_INSERT != t.type && TK_IMPORT != t.type) { return false; } @@ -35,7 +35,7 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) { do { pStr += index; index = 0; - t = tStrGetToken((char*)pStr, &index, false); + t = tStrGetToken((char*)pStr, &index, false, NULL); if (TK_USING == t.type || TK_VALUES == t.type || TK_FILE == t.type) { return true; } else if (TK_SELECT == t.type) { diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index e61bcc9ffc0c9627ac8d8e845fb0092cca2a6f3b..18f263cc95f7bb9f611cb0a81343b60e7e36d95b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -510,16 +510,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { SSyncLogStoreData *pData = ths->pLogStore->data; SWal *pWal = pData->pWal; - bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore); int64_t walCommitVer = walGetCommittedVer(pWal); - - if (!isEmpty && ths->commitIndex != walCommitVer) { - sNError(ths, "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", walCommitVer, - ths->commitIndex); - snapStart = walCommitVer + 1; - } else { - snapStart = ths->commitIndex + 1; - } + snapStart = TMAX(ths->commitIndex, walCommitVer) + 1; sNInfo(ths, "snapshot begin index is %" PRId64, snapStart); } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 62d82edeb1a3fc59c7dbf34a9ec40ba2e9eaa9d1..63e88b0a12854780a7c153c7bffcae287dd987bf 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -457,6 +457,11 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { SPgno journalSize = 0; int ret; + if (pTxn->jfd == 0) { + // txn is commited + return 0; + } + // sync the journal file ret = tdbOsFSync(pTxn->jfd); if (ret < 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1a99db5f992d51f5e01ba5120e2099f1d53a307f..ef64e8e040665ca10b7f613c364520d07039311e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1045,6 +1045,12 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); + if (addr == 0xffffffff) { + terrno = TAOS_SYSTEM_ERROR(errno); + tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr()); + return addr; + } + taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } else { addr = *v; @@ -1116,9 +1122,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); + if (ipaddr == 0xffffffff) { + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + + cliHandleExcept(conn); + return; + } + struct sockaddr_in addr; addr.sin_family = AF_INET; - addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); + addr.sin_addr.s_addr = ipaddr; addr.sin_port = (uint16_t)htons((uint16_t)conn->port); STraceId* trace = &(pMsg->msg.info.traceId); diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 6611a937f2486693e5cf82eff7d9d1c7ec1394a1..de628c75963c293d6c25d161f5f05f4af8e10e81 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -944,7 +944,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) { iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); if (iResult != 0) { // printf("WSAStartup failed: %d\n", iResult); - return 1; + return 0xFFFFFFFF; } #endif struct addrinfo hints = {0}; diff --git a/tests/script/tsim/compute/block_dist.sim b/tests/script/tsim/compute/block_dist.sim index 2d0a4e890267af3520e8c99f21a4d87c5bc1ceef..4fdcf63e34f5c5949d87f9b02cc3b620c87bd429 100644 --- a/tests/script/tsim/compute/block_dist.sim +++ b/tests/script/tsim/compute/block_dist.sim @@ -91,6 +91,10 @@ print ============== TD-5998 sql_error select _block_dist() from (select * from $nt) sql_error select _block_dist() from (select * from $mt) +print ============== TD-22140 & TD-22165 +sql_error show table distributed information_schema.ins_databases +sql_error show table distributed performance_schema.perf_apps + print =============== clear sql drop database $db sql select * from information_schema.ins_databases