diff --git a/cmake/cmake.version b/cmake/cmake.version index 5c5abe79bbe322ae561cb71c69ab662e0ea23875..c379d5e7396def36736d1113137883f7ef46b5b8 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.1.7") + SET(TD_VER_NUMBER "3.0.1.8") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index 8bfdf72cc7a848365834e7f6a87c884e46031c05..32bdc21e7c0552f5f891b38f806e48efb6c419ac 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w import Release from "/components/ReleaseV3"; +## 3.0.1.8 + + + ## 3.0.1.7 diff --git a/docs/en/28-releases/02-tools.md b/docs/en/28-releases/02-tools.md index 2bc22a44500ca275c42cd1790074baaabb192cdb..7126b5a997043231d1cf93d633b8cd71e5f6275e 100644 --- a/docs/en/28-releases/02-tools.md +++ b/docs/en/28-releases/02-tools.md @@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat import Release from "/components/ReleaseV3"; +## 2.3.0 + + + ## 2.2.9 diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index cd726e0a0ea644f575e16c656eeb4bb2cabf425d..932ad30b1a949d172d81819f2432daa42ce331c8 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -72,7 +72,7 @@ SHOW STREAMS; 若要展示更详细的信息,可以使用: ```sql -SELECT * from performance_schema.`perf_streams`; +SELECT * from information_schema.`ins_streams`; ``` ## 流式计算的触发模式 diff --git a/docs/zh/20-third-party/01-grafana.mdx b/docs/zh/20-third-party/01-grafana.mdx index 83f3f8bb25de4b99a345bafab7e8a43c3d35f14e..d5cfe847cada26a102b634573e7f4e2492adf60b 100644 --- a/docs/zh/20-third-party/01-grafana.mdx +++ b/docs/zh/20-third-party/01-grafana.mdx @@ -77,7 +77,7 @@ sudo -u grafana grafana-cli plugins install tdengine-datasource 或者从 [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) 或 [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) 下载 .zip 文件到本地并解压到 Grafana 插件目录。命令行下载示例如下: ```bash -GF_VERSION=3.2.2 +GF_VERSION=3.2.7 # from GitHub wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip # from Grafana diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index fd2be899eb475efcb189e0b80b5c6cf180557cb9..7ed9e0c5a018401e2028a1e0786459d4e26f27b6 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -10,6 +10,11 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do import Release from "/components/ReleaseV3"; +## 3.0.1.8 + + + + ## 3.0.1.7 diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index 3f73b53fab46b8d57317c663cde5ffd54e66e1f8..67ca3fae67b36e5f08c57440bbaa64ec4f80bf4e 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下: import Release from "/components/ReleaseV3"; +## 2.3.0 + + + ## 2.2.9 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ecd1b6f916c814fe5ddcb2b72591c80b6e6c450a..4099551188cc1c8e75a01a5bb0dec177ad559da7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -140,15 +140,40 @@ typedef struct { int8_t type; } SStreamCheckpoint; -typedef struct { - int8_t type; -} SStreamTaskDestroy; - typedef struct { int8_t type; SSDataBlock* pBlock; } SStreamTrigger; +typedef struct SStreamQueueNode SStreamQueueNode; + +struct SStreamQueueNode { + SStreamQueueItem* item; + SStreamQueueNode* next; +}; + +typedef struct { + SStreamQueueNode* head; + int64_t size; +} SStreamQueueRes; + +void streamFreeQitem(SStreamQueueItem* data); + +bool streamQueueResEmpty(const SStreamQueueRes* pRes); +int64_t streamQueueResSize(const SStreamQueueRes* pRes); +SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes); +SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes); +void streamQueueResClear(SStreamQueueRes* pRes); +SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode); + +typedef struct { + SStreamQueueNode* pHead; +} SStreamQueue1; + +bool streamQueueHasTask(const SStreamQueue1* pQueue); +int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); +SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); + typedef struct { STaosQueue* queue; STaosQall* qall; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 300251d64d0626859fdb5a79fc7df81bcb016ca4..58f8172282361d5d5a95c6e66b8c6da2cc90b292 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -324,15 +324,15 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SMqHbReq req = {0}; + SMnode *pMnode = pMsg->info.node; + SMqHbReq req = {0}; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int64_t consumerId = req.consumerId; + int64_t consumerId = req.consumerId; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer %" PRId64 " not exist", consumerId); @@ -363,17 +363,17 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SMqAskEpReq req = {0}; - SMqAskEpRsp rsp = {0}; + SMnode *pMnode = pMsg->info.node; + SMqAskEpReq req = {0}; + SMqAskEpRsp rsp = {0}; if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int64_t consumerId = req.consumerId; - int32_t epoch = req.epoch; + int64_t consumerId = req.consumerId; + int32_t epoch = req.epoch; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { @@ -457,6 +457,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { if (topicEp.vgs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosRUnLockLatch(&pConsumer->lock); + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); goto FAIL; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e8428ea4700741254cabbc43dc3b4380a50f23ad..3c1d3f09bf297363b4b74c8971c50f3d3a64408a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -317,9 +317,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); ASSERT(pDbObj != NULL); - sdbRelease(pSdb, pDbObj); bool multiTarget = pDbObj->cfg.numOfVgroups > 1; + sdbRelease(pSdb, pDbObj); if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { /*if (true) {*/ @@ -451,7 +451,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); if (pEpInfo == NULL) { - ASSERT(0); terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 58c89d76aa41542d4138ed4b638333edc74ef947..ffb46e5f1b7e085d00e9a4f69e667c44a8db9fa7 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -782,6 +782,7 @@ SUB_DECODE_OVER: return NULL; } + mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub); return pRow; } @@ -928,6 +929,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) action.msgType = TDMT_VND_TMQ_DELETE_SUB; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); + sdbRelease(pSdb, pSub); return -1; } } @@ -936,6 +938,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) sdbRelease(pSdb, pSub); goto END; } + + sdbRelease(pSdb, pSub); } code = 0; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 03532eb6d409221c2fa138df419af1dd4a2620d9..75fb5664381269b04121ea700971c223093844ad 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -660,6 +660,13 @@ _end: return code; } +static void tdBlockDataDestroy(SArray *pBlockArr) { + for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) { + blockDataDestroy(taosArrayGetP(pBlockArr, i)); + } + taosArrayDestroy(pBlockArr); +} + static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid) { SArray *pResList = taosArrayInit(1, POINTER_BYTES); @@ -701,38 +708,42 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); - smaDebug("result block, uid:%"PRIu64", groupid:%"PRIu64", rows:%d", output->info.uid, output->info.groupId, - output->info.rows); + smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.uid, output->info.groupId, + output->info.rows); - STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq *pReq = NULL; + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + SSubmitReq *pReq = NULL; // TODO: the schema update should be handled later(TD-17965) if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%"PRIu64", level %" PRIi8 " failed since %s", SMA_VID(pSma), - suid, output->info.groupId, pItem->level, terrstr()); + smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8 + " failed since %s", + SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); goto _err; } if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s", + smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 + " failed since %s", SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); goto _err; } - smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, - SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen)); + smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64 + " len %" PRIu32, + SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, + htonl(pReq->header.contLen)); taosMemoryFreeClear(pReq); } } - taosArrayDestroy(pResList); + tdBlockDataDestroy(pResList); return TSDB_CODE_SUCCESS; _err: - taosArrayDestroy(pResList); + tdBlockDataDestroy(pResList); return TSDB_CODE_FAILED; } @@ -820,8 +831,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) { int32_t idx = level - 1; - - void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx); + void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx); if (!qTaskInfo) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6c1c552ccb56cf6057f1dd2cc8e9c8e9a35414cd..61f027039d3d64b60c9a00be39585465c64cf0a9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -379,7 +379,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - ASSERT(0); return -1; } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4319dd379a5ffcbfe674efb04ccde6136baf29d1..036e14a62154c655cc03cdc586aa34775f298ac0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -82,12 +82,17 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* static void releaseQueryBuf(size_t numOfTables); -static void destroyFillOperatorInfo(void* param); -static void destroyProjectOperatorInfo(void* param); -static void destroySortOperatorInfo(void* param); -static void destroyAggOperatorInfo(void* param); - -static void destroyIntervalOperatorInfo(void* param); +static void destroyFillOperatorInfo(void* param); +static void destroyAggOperatorInfo(void* param); +static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); +static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); +static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, + const char* pKey); +static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, + int32_t status); +static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, + bool createDummyCol); void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; @@ -129,9 +134,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo); -static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); - SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; @@ -362,9 +364,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo } } -static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, - bool createDummyCol); - static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) { SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { @@ -996,9 +995,6 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } } -static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, - int32_t status); - void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { if (pFilterInfo == NULL || pBlock->info.rows == 0) { return; @@ -1564,9 +1560,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t return TSDB_CODE_SUCCESS; } -static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, - const char* pKey); - int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; @@ -1641,11 +1634,16 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } } + // the downstream operator may return with error code, so let's check the code before generating results. + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - return TSDB_CODE_SUCCESS; + return pTaskInfo->code; } static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { @@ -1684,7 +1682,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pInfo->pRes; } -static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index cde8346487d641a5d80e033555c7e507ce0ef42d..82398d6e340b7d5439c6ffe8de21df194c429083 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -915,33 +915,39 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { } pDest->info.rows++; if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) { - SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); - SSDataBlock* pResBlock = createDataBlock(); - pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; - SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); - taosArrayPush(pResBlock->pDataBlock, &data); - blockDataEnsureCapacity(pResBlock, 1); - projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); - ASSERT(pResBlock->info.rows == 1); - ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); - SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); - ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); - void* pData = colDataGetVarData(pCol, 0); - // TODO check tbname validity - if (pData != (void*)-1) { - memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); - memcpy(pDest->info.parTbName, varDataVal(pData), len); - /*pDest->info.parTbName[len + 1] = 0;*/ + void* tbname = NULL; + if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { + memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + tdbFree(tbname); } else { - pDest->info.parTbName[0] = 0; - } - if (pParInfo->groupId && pDest->info.parTbName[0]) { - streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); + SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); + SSDataBlock* pResBlock = createDataBlock(); + pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; + SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); + taosArrayPush(pResBlock->pDataBlock, &data); + blockDataEnsureCapacity(pResBlock, 1); + projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); + ASSERT(pResBlock->info.rows == 1); + ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); + ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); + void* pData = colDataGetVarData(pCol, 0); + // TODO check tbname validity + if (pData != (void*)-1) { + memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); + memcpy(pDest->info.parTbName, varDataVal(pData), len); + /*pDest->info.parTbName[len + 1] = 0;*/ + } else { + pDest->info.parTbName[0] = 0; + } + if (pParInfo->groupId && pDest->info.parTbName[0]) { + streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); + } + /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ + blockDataDestroy(pTmpBlock); + blockDataDestroy(pResBlock); } - /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ - blockDataDestroy(pTmpBlock); - blockDataDestroy(pResBlock); } } taosArrayDestroy(pParInfo->rowIds); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8db450ad50e441059e0fbbf4db7233e89114756e..4c23d17ac79cb6d36521f7590d996997993d930f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -163,8 +163,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, - GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, + buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; @@ -306,7 +306,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; pCost->totalBlocks += 1; @@ -1312,6 +1312,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); varDataSetLen(tbname, strlen(varDataVal(tbname))); + tdbFree(parTbname); } appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, tbname[0] == 0 ? NULL : tbname); @@ -1510,10 +1511,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); - if (code != TSDB_CODE_SUCCESS) { + // ignore the table not exists error, since this table may have been dropped during the scan procedure. + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); T_LONG_JMP(pTaskInfo->env, code); } + + // reset the error code. + terrno = 0; } if (filter) { @@ -1928,6 +1933,7 @@ FETCH_NEXT_BLOCK: if (pInfo->validBlockIndex >= totBlockNum) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); + qDebug("stream scan return empty, consume block %d", totBlockNum); return NULL; } @@ -2562,7 +2568,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { uint32_t status = 0; loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); -// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); + // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2893,7 +2899,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 9908f3581854f6d1e65b4ad3bbeb0c65b4858084..7674b9e479e45303b8c0c46478c96dd15693cedb 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -762,12 +762,10 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI resetPrevAndNextWindow(pFillSup, pState); SWinKey key = {.ts = ts, .groupId = groupId}; - // void* curVal = NULL; int32_t curVLen = 0; int32_t code = streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen); ASSERT(code == TSDB_CODE_SUCCESS); pFillSup->cur.key = key.ts; - // pFillSup->cur.pRowVal = curVal; } void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) { @@ -952,6 +950,19 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS } } +void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { + for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + int32_t slotId = GET_DEST_SLOT_ID(pFillCol); + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); + SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId); + pCell->isNull = pCurCell->isNull; + if (!pCurCell->isNull) { + memcpy(pCell->pData, pCurCell->pData, pCell->bytes); + } + } +} + void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { pFillInfo->preRowKey = pFillSup->cur.key; @@ -993,6 +1004,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; } + copyNotFillExpData(pFillSup, pFillInfo); } break; case TSDB_FILL_PREV: { if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) || diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 35f50cebca699fb215c5c18626bfeebc86bb5a2e..622baa76c998180bf237ec8096f34c5a15c7f7b6 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3376,7 +3376,8 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { int64_t* pts = (int64_t*)pInput->pPTS->pData; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - char* data = colDataGetData(pInputCol, i); + bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL); + char* data = isNull ? NULL : colDataGetData(pInputCol, i); TSKEY cts = pts[i]; numOfElems++; diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 075408f1b34954c215cc9fc11fbc08b384014001..2a2865a955f8856a6725a50a0b604fdf1304d6c9 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -699,8 +699,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou } else { for (int32_t m = 0; m < node->pParameterList->length; m++) { output->status = sifMergeCond(node->condType, output->status, params[m].status); - taosArrayDestroy(params[m].result); - params[m].result = NULL; + // taosArrayDestroy(params[m].result); + // params[m].result = NULL; } } _return: @@ -857,9 +857,15 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } *status = res->status; - sifFreeParam(res); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); + + void *iter = taosHashIterate(ctx.pRes, NULL); + while (iter != NULL) { + SIFParam *data = (SIFParam *)iter; + sifFreeParam(data); + iter = taosHashIterate(ctx.pRes, iter); + } taosHashCleanup(ctx.pRes); return code; } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 44f792869e4527188a2faa6ebb224e9ab72b2345..d1271e929056cd32665a89bedb6220de0b1739e1 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -603,7 +603,7 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell bool *equal = (bool *)colDataGetData(pComp->columnData, rowIdx); if (*equal) { - bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); + bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); colDataAppend(output->columnData, rowIdx, pData, isNull); @@ -617,7 +617,7 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell } if (pElse) { - bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); + bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); colDataAppend(output->columnData, rowIdx, pData, isNull); @@ -666,7 +666,7 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCe bool *whenValue = (bool *)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0)); if (*whenValue) { - bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); + bool isNull = colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)); colDataAppend(output->columnData, rowIdx, pData, isNull); @@ -685,7 +685,7 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCe } if (pElse) { - bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); + bool isNull = colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); char *pData = isNull ? NULL : colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)); colDataAppend(output->columnData, rowIdx, pData, isNull); @@ -1210,6 +1210,7 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) { SScalarParam output = {0}; ctx->code = sclExecOperator(node, ctx, &output); if (ctx->code) { + sclFreeParam(&output); return DEAL_RES_ERROR; } @@ -1358,6 +1359,7 @@ EDealRes sclWalkOperator(SNode *pNode, SScalarCtx *ctx) { ctx->code = sclExecOperator(node, ctx, &output); if (ctx->code) { + sclFreeParam(&output); return DEAL_RES_ERROR; } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 4cf4862136592fc5f367e56b2585dc6c6dbe0162..95d22044d7b3624fee7cb01a8742f32fa6bba415 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -343,11 +343,11 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn int32_t inputLen = varDataLen(buf); int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; - char *t = taosMemoryCalloc(1, outputMaxLen); - int32_t ret = taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), - outputMaxLen - VARSTR_HEADER_SIZE, &len); + char *t = taosMemoryCalloc(1, outputMaxLen); + int32_t ret = + taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), outputMaxLen - VARSTR_HEADER_SIZE, &len); if (!ret) { - sclError("failed to convert to NCHAR"); + sclError("failed to convert to NCHAR"); } varDataSetLen(t, len); @@ -370,8 +370,8 @@ static FORCE_INLINE void ncharToVar(char *buf, SScalarParam *pOut, int32_t rowIn taosMemoryFree(t); } -//TODO opt performance, tmp is not needed. -int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) { +// TODO opt performance, tmp is not needed. +int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { bool vton = false; _bufConverteFunc func = NULL; @@ -383,11 +383,11 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) { func = varToUnsigned; } else if (IS_FLOAT_TYPE(pCtx->outType)) { func = varToFloat; - } else if (pCtx->outType == TSDB_DATA_TYPE_BINARY) { // nchar -> binary + } else if (pCtx->outType == TSDB_DATA_TYPE_BINARY) { // nchar -> binary ASSERT(pCtx->inType == TSDB_DATA_TYPE_NCHAR); func = ncharToVar; vton = true; - } else if (pCtx->outType == TSDB_DATA_TYPE_NCHAR) { // binary -> nchar + } else if (pCtx->outType == TSDB_DATA_TYPE_NCHAR) { // binary -> nchar ASSERT(pCtx->inType == TSDB_DATA_TYPE_VARCHAR); func = varToNchar; vton = true; @@ -405,10 +405,10 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) { continue; } - char* data = colDataGetVarData(pCtx->pIn->columnData, i); + char *data = colDataGetVarData(pCtx->pIn->columnData, i); int32_t convertType = pCtx->inType; - if(pCtx->inType == TSDB_DATA_TYPE_JSON){ - if(*data == TSDB_DATA_TYPE_NULL) { + if (pCtx->inType == TSDB_DATA_TYPE_JSON) { + if (*data == TSDB_DATA_TYPE_NULL) { ASSERT(0); } else if (*data == TSDB_DATA_TYPE_NCHAR) { data += CHAR_BYTES; @@ -417,13 +417,13 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) { terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR; return terrno; } else { - convertNumberToNumber(data+CHAR_BYTES, colDataGetNumData(pCtx->pOut->columnData, i), *data, pCtx->outType); + convertNumberToNumber(data + CHAR_BYTES, colDataGetNumData(pCtx->pOut->columnData, i), *data, pCtx->outType); continue; } } int32_t bufSize = pCtx->pIn->columnData->info.bytes; - char *tmp = taosMemoryMalloc(varDataTLen(data)); - if(!tmp){ + char *tmp = taosMemoryMalloc(varDataTLen(data)); + if (!tmp) { sclError("out of memory in vectorConvertFromVarData"); return TSDB_CODE_OUT_OF_MEMORY; } @@ -446,7 +446,7 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) { tmp[len] = 0; } } - + (*func)(tmp, pCtx->pOut, i, overflow); taosMemoryFreeClear(tmp); } @@ -584,11 +584,12 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t } int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) { - SColumnInfoData* pInputCol = pCtx->pIn->columnData; - SColumnInfoData* pOutputCol = pCtx->pOut->columnData; - char tmp[128] = {0}; + SColumnInfoData *pInputCol = pCtx->pIn->columnData; + SColumnInfoData *pOutputCol = pCtx->pOut->columnData; + char tmp[128] = {0}; - if (IS_SIGNED_NUMERIC_TYPE(pCtx->inType) || pCtx->inType == TSDB_DATA_TYPE_BOOL || pCtx->inType == TSDB_DATA_TYPE_TIMESTAMP) { + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inType) || pCtx->inType == TSDB_DATA_TYPE_BOOL || + pCtx->inType == TSDB_DATA_TYPE_TIMESTAMP) { for (int32_t i = pCtx->startIndex; i <= pCtx->endIndex; ++i) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { colDataAppendNULL(pOutputCol, i); @@ -648,17 +649,18 @@ int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) { } // TODO opt performance -int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, int32_t* overflow, int32_t startIndex, int32_t numOfRows) { - SColumnInfoData* pInputCol = pIn->columnData; - SColumnInfoData* pOutputCol = pOut->columnData; +int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex, + int32_t numOfRows) { + SColumnInfoData *pInputCol = pIn->columnData; + SColumnInfoData *pOutputCol = pOut->columnData; if (NULL == pInputCol) { sclError("input column is NULL, hashFilter %p", pIn->pHashFilter); return TSDB_CODE_APP_ERROR; } - int32_t rstart = (startIndex >= 0 && startIndex < pIn->numOfRows) ? startIndex : 0; - int32_t rend = numOfRows > 0 ? rstart + numOfRows - 1 : rstart + pIn->numOfRows - 1; + int32_t rstart = (startIndex >= 0 && startIndex < pIn->numOfRows) ? startIndex : 0; + int32_t rend = numOfRows > 0 ? rstart + numOfRows - 1 : rstart + pIn->numOfRows - 1; SSclVectorConvCtx cCtx = {pIn, pOut, rstart, rend, pInputCol->info.type, pOutputCol->info.type}; if (IS_VAR_DATA_TYPE(cCtx.inType)) { @@ -669,14 +671,14 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, ASSERT(1 == pIn->numOfRows); pOut->numOfRows = 0; - + if (IS_SIGNED_NUMERIC_TYPE(cCtx.outType)) { int64_t minValue = tDataTypes[cCtx.outType].minValue; int64_t maxValue = tDataTypes[cCtx.outType].maxValue; - + double value = 0; GET_TYPED_DATA(value, double, cCtx.inType, colDataGetData(pInputCol, 0)); - + if (value > maxValue) { *overflow = 1; return TSDB_CODE_SUCCESS; @@ -689,10 +691,10 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, } else if (IS_UNSIGNED_NUMERIC_TYPE(cCtx.outType)) { uint64_t minValue = (uint64_t)tDataTypes[cCtx.outType].minValue; uint64_t maxValue = (uint64_t)tDataTypes[cCtx.outType].maxValue; - + double value = 0; GET_TYPED_DATA(value, double, cCtx.inType, colDataGetData(pInputCol, 0)); - + if (value > maxValue) { *overflow = 1; return TSDB_CODE_SUCCESS; @@ -733,7 +735,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, } break; } - case TSDB_DATA_TYPE_SMALLINT:{ + case TSDB_DATA_TYPE_SMALLINT: { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { colDataAppendNULL(pOutputCol, i); @@ -746,7 +748,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, } break; } - case TSDB_DATA_TYPE_INT:{ + case TSDB_DATA_TYPE_INT: { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { colDataAppendNULL(pOutputCol, i); @@ -773,7 +775,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, } break; } - case TSDB_DATA_TYPE_UTINYINT:{ + case TSDB_DATA_TYPE_UTINYINT: { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { colDataAppendNULL(pOutputCol, i); @@ -786,7 +788,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, } break; } - case TSDB_DATA_TYPE_USMALLINT:{ + case TSDB_DATA_TYPE_USMALLINT: { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { colDataAppendNULL(pOutputCol, i); @@ -799,7 +801,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, } break; } - case TSDB_DATA_TYPE_UINT:{ + case TSDB_DATA_TYPE_UINT: { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { colDataAppendNULL(pOutputCol, i); @@ -821,11 +823,11 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, uint64_t value = 0; GET_TYPED_DATA(value, uint64_t, cCtx.inType, colDataGetData(pInputCol, i)); - colDataAppendInt64(pOutputCol, i, (int64_t*)&value); + colDataAppendInt64(pOutputCol, i, (int64_t *)&value); } break; } - case TSDB_DATA_TYPE_FLOAT:{ + case TSDB_DATA_TYPE_FLOAT: { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { colDataAppendNULL(pOutputCol, i); @@ -834,7 +836,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, float value = 0; GET_TYPED_DATA(value, float, cCtx.inType, colDataGetData(pInputCol, i)); - colDataAppendFloat(pOutputCol, i, (float*)&value); + colDataAppendFloat(pOutputCol, i, (float *)&value); } break; } @@ -847,7 +849,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, double value = 0; GET_TYPED_DATA(value, double, cCtx.inType, colDataGetData(pInputCol, i)); - colDataAppendDouble(pOutputCol, i, (double*)&value); + colDataAppendDouble(pOutputCol, i, (double *)&value); } break; } @@ -865,25 +867,25 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB + 1][TSDB_DATA_TYPE_BLOB + 1] = { /* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB */ - /*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 7, 11, 12, 13, 14, 0, 7, 0, 0, - /*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, - /*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, - /*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 5, 9, 7, 4, 4, 5, 7, 0, 7, 0, 0, - /*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 5, 9, 7, 5, 5, 5, 7, 0, 7, 0, 0, - /*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0, - /*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0, - /*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 7, 7, 7, 7, 0, 0, 0, 0, - /*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0, - /*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, - /*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0, - /*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0, - /*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 7, 0, 0, - /*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, - /*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + /*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + /*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 7, 11, 12, 13, 14, 0, 7, 0, 0, + /*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, + /*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, + /*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 5, 9, 7, 4, 4, 5, 7, 0, 7, 0, 0, + /*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 5, 9, 7, 5, 5, 5, 7, 0, 7, 0, 0, + /*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0, + /*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0, + /*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 7, 7, 7, 7, 0, 0, 0, 0, + /*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0, + /*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, + /*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0, + /*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0, + /*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 7, 0, 0, + /*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, + /*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + /*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + /*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + /*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; int32_t vectorGetConvertType(int32_t type1, int32_t type2) { if (type1 == type2) { @@ -897,7 +899,8 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) { return gConvertTypes[type2][type1]; } -int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex, int32_t numOfRows) { +int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex, + int32_t numOfRows) { SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; output->numOfRows = input->numOfRows; @@ -914,8 +917,9 @@ int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_ return TSDB_CODE_SUCCESS; } -int32_t vectorConvertCols(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut, int32_t startIndex, int32_t numOfRows) { - int32_t leftType = GET_PARAM_TYPE(pLeft); +int32_t vectorConvertCols(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pLeftOut, SScalarParam *pRightOut, + int32_t startIndex, int32_t numOfRows) { + int32_t leftType = GET_PARAM_TYPE(pLeft); int32_t rightType = GET_PARAM_TYPE(pRight); if (leftType == rightType) { return TSDB_CODE_SUCCESS; @@ -1007,9 +1011,9 @@ static void vectorMathTsAddHelper(SColumnInfoData *pLeftCol, SColumnInfoData *pR } } -static SColumnInfoData* vectorConvertVarToDouble(SScalarParam* pInput, int32_t* converted) { - SScalarParam output = {0}; - SColumnInfoData* pCol = pInput->columnData; +static SColumnInfoData *vectorConvertVarToDouble(SScalarParam *pInput, int32_t *converted) { + SScalarParam output = {0}; + SColumnInfoData *pCol = pInput->columnData; if (IS_VAR_DATA_TYPE(pCol->info.type) && pCol->info.type != TSDB_DATA_TYPE_JSON) { int32_t code = vectorConvertSingleCol(pInput, &output, TSDB_DATA_TYPE_DOUBLE, -1, -1); @@ -1024,7 +1028,7 @@ static SColumnInfoData* vectorConvertVarToDouble(SScalarParam* pInput, int32_t* } *converted = VECTOR_UN_CONVERT; - + return pInput->columnData; } @@ -1043,9 +1047,9 @@ void vectorMathAdd(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows); - int32_t leftConvert = 0, rightConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); - SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); + int32_t leftConvert = 0, rightConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); if ((GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_TIMESTAMP && IS_INTEGER_TYPE(GET_PARAM_TYPE(pRight))) || (GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_TIMESTAMP && IS_INTEGER_TYPE(GET_PARAM_TYPE(pLeft))) || @@ -1150,9 +1154,9 @@ void vectorMathSub(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int32_t leftConvert = 0, rightConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); - SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); + int32_t leftConvert = 0, rightConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); if ((GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_TIMESTAMP && GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_BIGINT) || (GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_TIMESTAMP && @@ -1228,9 +1232,9 @@ void vectorMathMultiply(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int32_t leftConvert = 0, rightConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); - SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); + int32_t leftConvert = 0, rightConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); _getDoubleValue_fn_t getVectorDoubleValueFnLeft = getVectorDoubleValueFn(pLeftCol->info.type); _getDoubleValue_fn_t getVectorDoubleValueFnRight = getVectorDoubleValueFn(pRightCol->info.type); @@ -1261,8 +1265,8 @@ void vectorMathDivide(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *p int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int32_t leftConvert = 0, rightConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + int32_t leftConvert = 0, rightConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); _getDoubleValue_fn_t getVectorDoubleValueFnLeft = getVectorDoubleValueFn(pLeftCol->info.type); @@ -1315,8 +1319,8 @@ void vectorMathRemainder(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int32_t leftConvert = 0, rightConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + int32_t leftConvert = 0, rightConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); _getDoubleValue_fn_t getVectorDoubleValueFnLeft = getVectorDoubleValueFn(pLeftCol->info.type); @@ -1394,8 +1398,8 @@ void vectorMathMinus(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pO int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : (pLeft->numOfRows - 1); int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int32_t leftConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + int32_t leftConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); _getDoubleValue_fn_t getVectorDoubleValueFnLeft = getVectorDoubleValueFn(pLeftCol->info.type); @@ -1456,9 +1460,9 @@ void vectorBitAnd(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int32_t leftConvert = 0, rightConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); - SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); + int32_t leftConvert = 0, rightConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type); _getBigintValue_fn_t getVectorBigintValueFnRight = getVectorBigintValueFn(pRightCol->info.type); @@ -1510,9 +1514,9 @@ void vectorBitOr(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - int32_t leftConvert = 0, rightConvert = 0; - SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); - SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); + int32_t leftConvert = 0, rightConvert = 0; + SColumnInfoData *pLeftCol = vectorConvertVarToDouble(pLeft, &leftConvert); + SColumnInfoData *pRightCol = vectorConvertVarToDouble(pRight, &rightConvert); _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type); _getBigintValue_fn_t getVectorBigintValueFnRight = getVectorBigintValueFn(pRightCol->info.type); @@ -1536,8 +1540,8 @@ void vectorBitOr(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, doReleaseVec(pRightCol, rightConvert); } -int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows, - int32_t step, __compar_fn_t fp, int32_t optr) { +int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex, + int32_t numOfRows, int32_t step, __compar_fn_t fp, int32_t optr) { int32_t num = 0; for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) { @@ -1590,15 +1594,15 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa return num; } -void doVectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows, - int32_t _ord, int32_t optr) { +void doVectorCompare(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex, + int32_t numOfRows, int32_t _ord, int32_t optr) { int32_t i = 0; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; int32_t lType = GET_PARAM_TYPE(pLeft); int32_t rType = GET_PARAM_TYPE(pRight); __compar_fn_t fp = NULL; int32_t compRows = 0; - + if (lType == rType) { fp = filterGetCompFunc(lType, optr); } else { @@ -1634,10 +1638,10 @@ void doVectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO } } -void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows, - int32_t _ord, int32_t optr) { - SScalarParam pLeftOut = {0}; - SScalarParam pRightOut = {0}; +void vectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex, + int32_t numOfRows, int32_t _ord, int32_t optr) { + SScalarParam pLeftOut = {0}; + SScalarParam pRightOut = {0}; SScalarParam *param1 = NULL; SScalarParam *param2 = NULL; @@ -1661,16 +1665,16 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam * } doVectorCompare(param1, param2, pOut, startIndex, numOfRows, _ord, optr); - + sclFreeParam(&pLeftOut); sclFreeParam(&pRightOut); } -void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) { +void vectorCompare(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) { vectorCompareImpl(pLeft, pRight, pOut, -1, -1, _ord, optr); } -void vectorGreater(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { +void vectorGreater(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) { vectorCompare(pLeft, pRight, pOut, _ord, OP_TYPE_GREATER_THAN); } @@ -1734,10 +1738,10 @@ void vectorNotNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut pOut->numOfRows = pLeft->numOfRows; } -void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { +void vectorIsTrue(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) { vectorConvertSingleColImpl(pLeft, pOut, NULL, -1, -1); - for(int32_t i = 0; i < pOut->numOfRows; ++i) { - if(colDataIsNull_s(pOut->columnData, i)) { + for (int32_t i = 0; i < pOut->numOfRows; ++i) { + if (colDataIsNull_s(pOut->columnData, i)) { int8_t v = 0; colDataAppendInt8(pOut->columnData, i, &v); colDataSetNotNull_f(pOut->columnData->nullbitmap, i); @@ -1748,7 +1752,7 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, STagVal getJsonValue(char *json, char *key, bool *isExist) { STagVal val = {.pKey = key}; - if (tTagIsJson((const STag *)json) == false) { + if (json == NULL || tTagIsJson((const STag *)json) == false) { terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR; if (isExist) { *isExist = false; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 0fc75c4798c2e24f92893a763077dea373ccade1..5ff49502df60401e1f08ea7aeeaf37f66e717e19 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -47,7 +47,6 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SEpSet* pEpSet); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); -void streamFreeQitem(SStreamQueueItem* data); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index ac10c8258744f178bd2010543176f545b40c88b6..7eafcdc93ea20b96140391c0e555ff5caf5bff93 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -45,3 +45,59 @@ void streamQueueClose(SStreamQueue* queue) { taosCloseQueue(queue->queue); taosMemoryFree(queue); } + +bool streamQueueResEmpty(const SStreamQueueRes* pRes) { + // + return true; +} +int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; } +SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; } +SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) { + SStreamQueueNode* pRet = pRes->head; + pRes->head = pRes->head->next; + return pRet; +} + +void streamQueueResClear(SStreamQueueRes* pRes) { + while (pRes->head) { + SStreamQueueNode* pNode = pRes->head; + streamFreeQitem(pRes->head->item); + pRes->head = pNode; + } +} + +SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) { + int64_t size = 0; + SStreamQueueNode* head = NULL; + + while (pTail) { + SStreamQueueNode* pTmp = pTail->next; + pTail->next = head; + head = pTail; + pTail = pTmp; + size++; + } + + return (SStreamQueueRes){.head = head, .size = size}; +} + +bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); } +int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) { + SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode)); + pNode->item = pItem; + SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead); + while (1) { + pNode->next = pHead; + SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode); + if (pOld == pHead) { + break; + } + } + return 0; +} + +SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { + SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL); + if (pNode) return streamQueueBuildRes(pNode); + return (SStreamQueueRes){0}; +} diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index aa8d3bef517908f402667629bfa023c2045e18d1..e882f7461d418844616424943cdb9bcd5f32ea66 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -70,6 +70,7 @@ typedef struct SSyncTimer { uint64_t logicClock; uint64_t counter; int32_t timerMS; + int64_t timeStamp; SRaftId destId; int64_t hbDataRid; } SSyncTimer; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 7ceec29be4d0c74b6b86b95e5f5a64ee9cc83fc2..6535f77fbec3406b624a77506ca0c401cb2af673 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -35,6 +35,7 @@ typedef struct SyncTimeout { ESyncTimeoutType timeoutType; uint64_t logicClock; int32_t timerMS; + int64_t timeStamp; void* data; // need optimized } SyncTimeout; diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index b7ee320aa52a84e5f6a4557b2d8bee4188a320e8..8c0793a9ea6d5d13435568d0b95ebe940b873682 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -94,11 +94,11 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed); +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff); void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); -void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff); void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); @@ -115,7 +115,7 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); -void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s); void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2aaa13f95d529447bdaf3542c1eae8b2b91d9e7b..1d7dcb1790dc2b48ec4ef7efc0935aa920e0686f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -691,6 +691,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa pSyncTimer->timerMS = pSyncNode->hbBaseLine; pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer; pSyncTimer->destId = destId; + pSyncTimer->timeStamp = taosGetTimestampMs(); atomic_store_64(&pSyncTimer->logicClock, 0); return 0; } @@ -704,6 +705,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->rid = syncHbTimerDataAdd(pData); } pSyncTimer->hbDataRid = pData->rid; + pSyncTimer->timeStamp = taosGetTimestampMs(); pData->syncNodeRid = pSyncNode->rid; pData->pTimer = pSyncTimer; @@ -1897,7 +1899,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { return; } - sTrace("enqueue ping msg"); + // sTrace("enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to sync enqueue ping msg since %s", terrstr()); @@ -2032,6 +2034,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); + // update reset time + int64_t tsNow = taosGetTimestampMs(); + int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; + pSyncTimer->timeStamp = tsNow; + SyncHeartbeat* pSyncMsg = rpcMsg.pCont; pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->destId = pData->destId; @@ -2039,9 +2046,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; - pSyncMsg->timeStamp = taosGetTimestampMs(); + pSyncMsg->timeStamp = tsNow; // send msg + syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } else { @@ -2151,9 +2159,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeat* pMsg = pRpcMsg->pCont; int64_t tsMs = taosGetTimestampMs(); - char buf[128]; - snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); - syncLogRecvHeartbeat(ths, pMsg, buf); + int64_t timeDiff = tsMs - pMsg->timeStamp; + syncLogRecvHeartbeat(ths, pMsg, timeDiff); SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); @@ -2163,7 +2170,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->srcId = ths->myRaftId; pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->privateTerm = 8864; // magic number - pMsgReply->timeStamp = taosGetTimestampMs(); + pMsgReply->timeStamp = tsMs; if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); @@ -2229,9 +2236,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsg = pRpcMsg->pCont; int64_t tsMs = taosGetTimestampMs(); - char buf[128]; - snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); - syncLogRecvHeartbeatReply(ths, pMsg, buf); + int64_t timeDiff = tsMs - pMsg->timeStamp; + syncLogRecvHeartbeatReply(ths, pMsg, timeDiff); // update last reply time, make decision whether the other node is alive or not syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index ce984199803fa07512cb0aee57bc95126dd9f820..28a8a2e9954071933e358e750aa16fc7065c5374 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -35,6 +35,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l pTimeout->timeoutType = timeoutType; pTimeout->logicClock = logicClock; pTimeout->timerMS = timerMS; + pTimeout->timeStamp = taosGetTimestampMs(); pTimeout->data = pNode; return 0; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 54c29febe5624a8be0068ef1df635a0bee01ed73..9dbd9fb370777401787e2746dc73661fcc726ff7 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -208,7 +208,6 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest } int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { - syncLogSendHeartbeat(pSyncNode, pMsg->pCont, ""); return syncNodeSendMsgById(destId, pSyncNode, pMsg); } @@ -231,6 +230,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { pSyncMsg->timeStamp = ts; // send msg + syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 8ffc22ee255db30c81a20e7fd06fc629b43e1da9..5c38ba1c1a4dc0aefeb00d91e9b321fbd9033771 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -94,7 +94,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { - syncLogRecvRequestVote(ths, pMsg, "not in my config"); + syncLogRecvRequestVote(ths, pMsg, -1, "not in my config"); return -1; } @@ -133,13 +133,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pReply->voteGranted = grant; // trace log - do { - char logBuf[32]; - snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted); - syncLogRecvRequestVote(ths, pMsg, logBuf); - syncLogSendRequestVoteReply(ths, pReply, ""); - } while (0); - + syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); + syncLogSendRequestVoteReply(ths, pReply, ""); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1e5a268e9768060bc0fcddfdcae6f887f1610924..ee383baac0eae2204035498a92d65852938ab329 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -396,16 +396,25 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df } void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", - syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); + if (!(sDebugFlag & DEBUG_TRACE)) return; + + int64_t tsNow = taosGetTimestampMs(); + int64_t timeDIff = tsNow - pMsg->timeStamp; + sNTrace( + pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s", + syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s); } void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); } void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -417,6 +426,8 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries } void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -427,28 +438,42 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); + if (printX) { + sNTrace(pSyncNode, + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 + "}, x", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp); + } else { + sNTrace(pSyncNode, + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 + "}, timer-elapsed:%" PRId64, + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed); + } } -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); + "}, net elapsed:%" PRId64, + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff); } void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -457,15 +482,19 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p pMsg->term, pMsg->timeStamp, s); } -void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->timeStamp, s); + sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, net elapsed:%" PRId64, + host, port, pMsg->term, pMsg->timeStamp, timeDiff); } void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -473,6 +502,8 @@ void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs } void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -480,6 +511,8 @@ void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs } void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -488,6 +521,8 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot } void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -496,6 +531,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot } void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -507,6 +544,8 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p } void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -519,6 +558,8 @@ void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p } void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -530,6 +571,8 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs } void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -541,6 +584,8 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs } void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -553,6 +598,8 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs } void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -563,16 +610,28 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs pMsg->dataLen, s); } -void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char logBuf[256]; char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); + + if (voteGranted == -1) { + sNTrace(pSyncNode, + "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, + port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); + } else { + sNTrace(pSyncNode, + "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, granted:%d", + host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted); + } } void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -581,6 +640,8 @@ void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const } void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -589,6 +650,8 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl } void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f093d84db6db31098703bf87b5e0c8ac42c929b3..b7fe404a4ed8ffdd3cfd92f1fcb06b8caba05a9b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -195,7 +195,7 @@ static bool uvHandleReq(SSvrConn* pConn) { } if (transDecompressMsg((char**)&pHead, msgLen) < 0) { - tDebug("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); + tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); return false; } @@ -277,10 +277,8 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; - tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); if (true == pBuf->invalid || false == uvHandleReq(conn)) { tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pTransInst), conn, conn->dst, conn->src); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 19dcec442a075ff511393e7b01ad6aa18d15dadd..2da4324b7c858facc19092d6603b7ceb74c29c24 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -418,18 +418,18 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,,system-test,python3 ./test.py -f 1-insert/alter_database.py -,,,system-test,python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py ,,,system-test,python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py -,,,system-test,python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py -,,,system-test,python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py -#,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py ,,n,system-test,python3 ./test.py -f 1-insert/boundary.py ,,n,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_comment.py ,,n,system-test,python3 ./test.py -f 1-insert/time_range_wise.py -,,,system-test,python3 ./test.py -f 1-insert/block_wise.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/block_wise.py ,,,system-test,python3 ./test.py -f 1-insert/create_retentions.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/mutil_stage.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_param_ttl.py @@ -622,7 +622,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/union1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py -,,,system-test,python3 ./test.py -f 2-query/json_tag.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py ,,,system-test,python3 ./test.py -f 2-query/nestedQuery.py ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_str.py ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_math.py @@ -670,23 +670,23 @@ ,,,system-test,python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3 -,,,system-test,python3 ./test.py -f 7-tmq/basic5.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb0.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb1.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb2.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py ,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb3.py ,,,system-test,python3 ./test.py -f 7-tmq/subscribeDb4.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb0.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb1.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb2.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb1.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb2.py ,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb3.py -,,,system-test,python3 ./test.py -f 7-tmq/subscribeStb4.py -,,,system-test,python3 ./test.py -f 7-tmq/db.py -,,,system-test,python3 ./test.py -f 7-tmq/tmqError.py -,,,system-test,python3 ./test.py -f 7-tmq/schema.py -,,,system-test,python3 ./test.py -f 7-tmq/stbFilter.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb4.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/db.py +,,n,system-test,python3 ./test.py -f 7-tmq/tmqError.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/schema.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilter.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData1.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqConsumerGroup.py @@ -709,8 +709,8 @@ ,,,system-test,python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py -,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py -,,,system-test,python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDropStb.py @@ -718,8 +718,8 @@ ,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot0.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot1.py ,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf.py -,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py -,,,system-test,python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py ,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py ,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,,system-test,python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py @@ -768,7 +768,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 2 -,,,system-test,python3 ./test.py -f 2-query/json_tag.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 2 @@ -862,7 +862,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 3 -,,,system-test,python3 ./test.py -f 2-query/json_tag.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 3 @@ -978,8 +978,8 @@ ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,,system-test,python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 -,,,system-test,python3 ./test.py -f 2-query/stablity.py -Q 4 -,,,system-test,python3 ./test.py -f 2-query/stablity_1.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 4 diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 274476e17c44fb7562fef7d92134b30ec4289263..cae7a66589bac72c352ed855902f3b23bb798046 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -195,7 +195,7 @@ sql select * from information_schema.ins_stables if $rows != 1 then return -1 endi -#sql select * from performance_schema.perf_streams +#sql select * frominformation_schema.ins_streams sql select * from information_schema.ins_tables if $rows <= 0 then return -1 diff --git a/tests/script/tsim/stream/fillIntervalValue.sim b/tests/script/tsim/stream/fillIntervalValue.sim index 89590d1be0b9f5866aaa7ef48e71f782ad78ac08..fe4ec759eb2fa9ea0520c3c03766a83eacd5fe6f 100644 --- a/tests/script/tsim/stream/fillIntervalValue.sim +++ b/tests/script/tsim/stream/fillIntervalValue.sim @@ -403,23 +403,46 @@ sql drop database if exists test4; sql create database test4 vgroups 1; sql use test4; -sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));; -sql create stream streams4 trigger at_once into streamt4 as select _wstart ts, count(*) c1 from t1 where ts > 1648791210000 and ts < 1648791413000 interval(10s) fill(NULL); +sql create stable st(ts timestamp,a int,b int,c int, d double, s varchar(20) ) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams4 trigger at_once into streamt4 as select _wstart ts, count(*) c1, concat(tbname, 'aaa') as pname, timezone() from st where ts > 1648791000000 and ts < 1648793000000 partition by tbname interval(10s) fill(NULL); sql insert into t1 values(1648791213000,1,2,3,1.0,'aaa'); sql insert into t1 values(1648791233000,1,2,3,1.0,'aaa'); +sql insert into t1 values(1648791273000,1,2,3,1.0,'aaa'); + +sql insert into t2 values(1648791213000,1,2,3,1.0,'bbb'); +sql insert into t2 values(1648791233000,1,2,3,1.0,'bbb'); +sql insert into t2 values(1648791273000,1,2,3,1.0,'bbb'); $loop_count = 0 loop4: sleep 200 -sql select * from streamt4 order by ts; +sql select * from streamt4 order by pname, ts; + +print ===> $data[0][0] , $data[0][1] , $data[0][2] , $data[0][3] +print ===> $data[1][0] , $data[1][1] , $data[1][2] , $data[1][3] +print ===> $data[2][0] , $data[2][1] , $data[2][2] , $data[2][3] +print ===> $data[3][0] , $data[3][1] , $data[3][2] , $data[3][3] +print ===> $data[4][0] , $data[4][1] , $data[4][2] , $data[4][3] +print ===> $data[5][0] , $data[5][1] , $data[5][2] , $data[5][3] +print ===> $data[6][0] , $data[6][1] , $data[6][2] , $data[6][3] +print ===> $data[7][0] , $data[7][1] , $data[7][2] , $data[7][3] +print ===> $data[8][0] , $data[8][1] , $data[8][2] , $data[8][3] +print ===> $data[9][0] , $data[9][1] , $data[9][2] , $data[9][3] +print ===> $data[10][0] , $data[10][1] , $data[10][2] , $data[10][3] +print ===> $data[11][0] , $data[11][1] , $data[11][2] , $data[11][3] +print ===> $data[12][0] , $data[12][1] , $data[12][2] , $data[12][3] +print ===> $data[13][0] , $data[13][1] , $data[13][2] , $data[13][3] $loop_count = $loop_count + 1 if $loop_count == 10 then return -1 endi -if $rows != 3 then +if $rows != 14 then print =====rows=$rows goto loop4 endi @@ -429,6 +452,67 @@ if $data11 != NULL then goto loop4 endi +if $data12 != t1aaa then + print =====data12=$data12 + goto loop4 +endi + +if $data13 == NULL then + print =====data13=$data13 + goto loop4 +endi + +if $data32 != t1aaa then + print =====data32=$data32 + goto loop4 +endi + +if $data42 != t1aaa then + print =====data42=$data42 + goto loop4 +endi + +if $data52 != t1aaa then + print =====data52=$data52 + goto loop4 +endi + +if $data81 != NULL then + print =====data81=$data81 + goto loop4 +endi + +if $data82 != t2aaa then + print =====data82=$data82 + goto loop4 +endi + +if $data83 == NULL then + print =====data83=$data83 + goto loop4 +endi + +if $data[10][2] != t2aaa then + print =====data[10][2]=$data[10][2] + goto loop4 +endi + +if $data[11][2] != t2aaa then + print =====data[11][2]=$data[11][2] + goto loop4 +endi + +if $data[12][2] != t2aaa then + print =====data[12][2]=$data[12][2] + goto loop4 +endi + +if $data[12][3] == NULL then + print =====data[12][3]=$data[12][3] + goto loop4 +endi + +