diff --git a/docs/en/05-get-started/index.md b/docs/en/05-get-started/index.md index 251581e98fbe3246d5c92cbc1d63cea432717f9e..12cfa22c69b1db8e82de4cb251ca3d8c67fc4546 100644 --- a/docs/en/05-get-started/index.md +++ b/docs/en/05-get-started/index.md @@ -8,6 +8,7 @@ import DiscordSVG from './discord.svg' import TwitterSVG from './twitter.svg' import YouTubeSVG from './youtube.svg' import LinkedInSVG from './linkedin.svg' +import StackOverflowSVG from './stackoverflow.svg' You can install and run TDengine on Linux/Windows/macOS machines as well as Docker containers. You can also deploy TDengine as a managed service with TDengine Cloud. @@ -35,10 +36,19 @@ The TDengine Knowledge Map covers the various knowledge points of TDengine, reve - - - - - + + + + + + + + + + + + + +

Star GitHub

Join Discord

Follow Twitter

Subscribe YouTube

Follow LinkedIn

Star GitHubJoin DiscordFollow TwitterSubscribe YouTubeFollow LinkedInAsk StackOverflow
diff --git a/docs/en/05-get-started/stackoverflow.svg b/docs/en/05-get-started/stackoverflow.svg new file mode 100644 index 0000000000000000000000000000000000000000..22b4b64d3209aaeb1b4d571d17bfb34bfc230d24 --- /dev/null +++ b/docs/en/05-get-started/stackoverflow.svg @@ -0,0 +1,7 @@ + + + + diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index f942713f5da5a9ce9db9088c178f11813d9be48c..6a6fd50f2a47656856e5fd5936e3eed01481503f 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -171,6 +171,7 @@ typedef struct SExchangeLogicNode { SLogicNode node; int32_t srcStartGroupId; int32_t srcEndGroupId; + bool seqRecvData; } SExchangeLogicNode; typedef struct SMergeLogicNode { @@ -416,6 +417,7 @@ typedef struct SExchangePhysiNode { int32_t srcEndGroupId; bool singleChannel; SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode + bool seqRecvData; } SExchangePhysiNode; typedef struct SMergePhysiNode { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e1886511b714fa0a92b1b03d998730e606fa5042..e8428ea4700741254cabbc43dc3b4380a50f23ad 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -224,7 +224,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { ASSERT(taosArrayGetSize(pStream->tasks) == 1); while (1) { - SVgObj* pVgroup; + SVgObj* pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) { @@ -258,6 +258,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); ASSERT(pTask->tbSink.pSchemaWrapper); } + sdbRelease(pSdb, pVgroup); } return 0; } @@ -382,6 +383,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + sdbRelease(pSdb, pVgroup); } else { if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) { sdbRelease(pSdb, pSnode); @@ -396,6 +398,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + sdbRelease(pSdb, pVgroup); } } @@ -459,6 +462,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { pEpInfo->nodeId = pTask->nodeId; pEpInfo->taskId = pTask->taskId; taosArrayPush(pInnerTask->childEpInfo, &pEpInfo); + sdbRelease(pSdb, pVgroup); } } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index cee0b84672a6d8e5c57c6718f1f665b2ab82e934..5e16397b14fc537980302dfc064cae0e66b1d8ff 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1187,11 +1187,11 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, goto NEXT; } if (pCol->colId > 0 && pCol->colId == colId) { - sdbRelease(pSdb, pTopic); - nodesDestroyNode(pAst); - nodesDestroyList(pNodeList); terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC; mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId); + nodesDestroyNode(pAst); + nodesDestroyList(pNodeList); + sdbRelease(pSdb, pTopic); return -1; } mInfo("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); @@ -1230,11 +1230,11 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName goto NEXT; } if (pCol->colId > 0 && pCol->colId == colId) { - sdbRelease(pSdb, pStream); - nodesDestroyNode(pAst); - nodesDestroyList(pNodeList); terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId); + nodesDestroyNode(pAst); + nodesDestroyList(pNodeList); + sdbRelease(pSdb, pStream); return -1; } mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId); @@ -1279,11 +1279,11 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, goto NEXT; } if ((pCol->colId) > 0 && (pCol->colId == colId)) { - sdbRelease(pSdb, pSma); - nodesDestroyNode(pAst); - nodesDestroyList(pNodeList); terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA; mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId); + nodesDestroyNode(pAst); + nodesDestroyList(pNodeList); + sdbRelease(pSdb, pSma); return -1; } mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 38cb534d7f3e7f8343893db38b286b2da2eabdae..4324c412f7755a7414399f50f5606f7a36b8f328 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -298,35 +298,24 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; - if (pMeta->code == 0) { - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - rpcMsg.info = pMsg->info; - rpcMsg.info.conn.applyIndex = pMeta->index; - rpcMsg.info.conn.applyTerm = pMeta->term; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + rpcMsg.info = pMsg->info; + rpcMsg.info.conn.applyIndex = pMeta->index; + rpcMsg.info.conn.applyTerm = pMeta->term; - const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 - ", weak:%d, code:%d, state:%d %s, type:%s", - pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, - pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + ", weak:%d, code:%d, state:%d %s, type:%s", + pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, pMeta->code, + pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); - tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); - } else { - SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info}; - vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", pVnode->config.vgId, - TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code)); - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - if (pMeta->isWeak == 0) { - vnodeSyncApplyMsg(pFsm, pMsg, pMeta); - } + vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { @@ -420,7 +409,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; - vDebug("vgId:%d, become follower", pVnode->config.vgId); + vInfo("vgId:%d, become follower", pVnode->config.vgId); taosThreadMutexLock(&pVnode->lock); if (pVnode->blocked) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 2163c16f170ac6e4521bfdb2b22263ff706508e2..b01a4b7871750bbffa501e1a3e00510b53bd1459 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -51,9 +51,9 @@ typedef struct SSourceDataInfo { const char* taskId; } SSourceDataInfo; -static void destroyExchangeOperatorInfo(void* param); -static void freeBlock(void* pParam); -static void freeSourceDataInfo(void* param); +static void destroyExchangeOperatorInfo(void* param); +static void freeBlock(void* pParam); +static void freeSourceDataInfo(void* param); static void* setAllSourcesCompleted(SOperatorInfo* pOperator); static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code); @@ -62,7 +62,9 @@ static int32_t getCompletedSources(const SArray* pArray); static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator); static int32_t seqLoadRemoteData(SOperatorInfo* pOperator); static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator); -static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); +static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, + bool holdDataInBuf); +static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo) { @@ -105,41 +107,33 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->numOfRows == 0) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); break; } - SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; - int32_t index = 0; - char* pStart = pRetrieveRsp->data; - while (index++ < pRetrieveRsp->numOfBlocks) { - SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); - code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); - if (code != 0) { - taosMemoryFreeClear(pDataInfo->pRsp); - goto _error; - } - - taosArrayPush(pExchangeInfo->pResultBlockList, &pb); + code = doExtractResultBlocks(pExchangeInfo, pDataInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator); pDataInfo->totalRows += pRetrieveRsp->numOfRows; if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", total:%.2f Kb, try next %d/%" PRIzu, + " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", total:%.2f Kb, try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, - pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, - i + 1, totalSources); + pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, + totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } @@ -164,7 +158,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } } - _error: +_error: pTaskInfo->code = code; } @@ -305,17 +299,19 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; qAppendTaskStopInfo(pTaskInfo, &stopInfo); - - pInfo->seqLoadData = false; + + pInfo->seqLoadData = pExNode->seqRecvData; pInfo->pTransporter = pTransporter; - setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, + pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); - pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); return pOperator; - _error: +_error: if (pInfo != NULL) { doDestroyExchangeOperatorInfo(pInfo); } @@ -386,7 +382,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), + pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; @@ -434,14 +431,14 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas taosMemoryFree(pWrapper); return pTaskInfo->code; } - + void* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; taosMemoryFree(pWrapper); return pTaskInfo->code; } - + if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; taosMemoryFree(pWrapper); @@ -531,7 +528,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pLoadInfo->totalElapsed / 1000.0); @@ -581,15 +578,37 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - + tsem_post(&pExchangeInfo->ready); return TSDB_CODE_SUCCESS; } +int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) { + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; + + char* pStart = pRetrieveRsp->data; + int32_t index = 0; + int32_t code = 0; + while (index++ < pRetrieveRsp->numOfBlocks) { + SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); + + code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); + if (code != 0) { + taosMemoryFreeClear(pDataInfo->pRsp); + return code; + } + + taosArrayPush(pExchangeInfo->pResultBlockList, &pb); + } + + return code; +} + int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = 0; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); int64_t startTs = taosGetTimestampUs(); @@ -599,13 +618,15 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } + SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; + doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); if (pDataInfo->code != TSDB_CODE_SUCCESS) { @@ -619,7 +640,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 " try next", + ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); @@ -629,14 +650,15 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { continue; } - SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; - - char* pStart = pRetrieveRsp->data; - int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, NULL, &pStart); + code = doExtractResultBlocks(pExchangeInfo, pDataInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); @@ -645,7 +667,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, + ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); } @@ -656,6 +678,10 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { taosMemoryFreeClear(pDataInfo->pRsp); return TSDB_CODE_SUCCESS; } + +_error: + pTaskInfo->code = code; + return code; } int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7fd2fcb5b86b2630886806dec845bcf178dd697c..013b8d39de558eba58571e784ad473820e2f9f2a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3655,6 +3655,11 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData setSessionWinOutputInfo(pStUpdated, &winInfo); winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap, pAggSup->pResultRows, pStUpdated, pStDeleted); + // coverity scan error + if (!winInfo.pOutputBuf) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, pOperator); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b9b365fb42c62ec9400b85450c3bca40ac920730..5b3e8ce5a9334e05fa184427469314482d960a74 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -434,6 +434,7 @@ static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicN COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(srcStartGroupId); COPY_SCALAR_FIELD(srcEndGroupId); + COPY_SCALAR_FIELD(seqRecvData); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 172c7694334d9a5199716434d169724c449eb037..462ac513a513601e5fef22c9cd6b1224e90f4ab3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1864,6 +1864,7 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId"; static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId"; static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints"; +static const char* jkExchangePhysiPlanSeqRecvData = "SeqRecvData"; static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj; @@ -1878,6 +1879,9 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkExchangePhysiPlanSeqRecvData, pNode->seqRecvData); + } return code; } @@ -1895,6 +1899,9 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkExchangePhysiPlanSeqRecvData, &pNode->seqRecvData); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 2879d55167f82821399b5296c16514af306d8dd6..1e8ff8da1a9d2484604199d91732ac2cf4ee141a 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2428,7 +2428,8 @@ enum { PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, - PHY_EXCHANGE_CODE_SRC_ENDPOINTS + PHY_EXCHANGE_CODE_SRC_ENDPOINTS, + PHY_EXCHANGE_CODE_SEQ_RECV_DATA }; static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2447,6 +2448,9 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_SRC_ENDPOINTS, nodeListToMsg, pNode->pSrcEndPoints); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SEQ_RECV_DATA, pNode->seqRecvData); + } return code; } @@ -2473,6 +2477,9 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) { case PHY_EXCHANGE_CODE_SRC_ENDPOINTS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSrcEndPoints); break; + case PHY_EXCHANGE_CODE_SEQ_RECV_DATA: + code = tlvDecodeBool(pTlv, &pNode->seqRecvData); + break; default: break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 54f450e9712b3e273ee9ec008d19ab4f0d590a24..0743b4066260c9dc517dc87131422134b9b12af8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3685,9 +3685,19 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery); } +static int32_t translateInsertTable(STranslateContext* pCxt, SNode* pTable) { + int32_t code = translateFrom(pCxt, pTable); + if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && + TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType) { + code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "insert data into super table is not supported"); + } + return code; +} + static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) { pCxt->pCurrStmt = (SNode*)pInsert; - int32_t code = translateFrom(pCxt, pInsert->pTable); + int32_t code = translateInsertTable(pCxt, pInsert->pTable); if (TSDB_CODE_SUCCESS == code) { code = translateInsertCols(pCxt, pInsert); } @@ -7089,9 +7099,10 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta, SVAlterTbReq* pReq) { - SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName); + SSchema* pSchema = getTagSchema(pTableMeta, pStmt->colName); if (NULL == pSchema) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid tag name: %s", + pStmt->colName); } pReq->tagName = strdup(pStmt->colName); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 89e8a858956afacfc0bf39f7ca4b7a060125da7e..44eb8478f174995313c2c2b619a2442b7e292270 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -36,6 +36,7 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi typedef struct SRewriteExprCxt { int32_t errCode; SNodeList* pExprs; + bool* pOutputs; } SRewriteExprCxt; static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) { @@ -63,14 +64,30 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) { } static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { + SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext; switch (nodeType(*pNode)) { + case QUERY_NODE_COLUMN: { + if (NULL != pCxt->pOutputs) { + SNode* pExpr; + int32_t index = 0; + FOREACH(pExpr, pCxt->pExprs) { + if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) { + pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0); + } + if (nodesEqualNode(pExpr, *pNode)) { + pCxt->pOutputs[index] = true; + break; + } + } + } + break; + } case QUERY_NODE_OPERATOR: case QUERY_NODE_LOGIC_CONDITION: case QUERY_NODE_FUNCTION: case QUERY_NODE_CASE_WHEN: { - SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext; - SNode* pExpr; - int32_t index = 0; + SNode* pExpr; + int32_t index = 0; FOREACH(pExpr, pCxt->pExprs) { if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) { pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0); @@ -89,6 +106,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) { } nodesDestroyNode(*pNode); *pNode = (SNode*)pCol; + if (NULL != pCxt->pOutputs) { + pCxt->pOutputs[index] = true; + } return DEAL_RES_IGNORE_CHILD; } ++index; @@ -121,7 +141,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) { static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClause clause) { nodesWalkExpr(pExpr, doNameExpr, NULL); - SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = NULL}; + SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = NULL, .pOutputs = NULL}; cxt.errCode = nodesListMakeAppend(&cxt.pExprs, pExpr); if (TSDB_CODE_SUCCESS == cxt.errCode) { nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); @@ -130,23 +150,50 @@ static int32_t rewriteExprForSelect(SNode* pExpr, SSelectStmt* pSelect, ESqlClau return cxt.errCode; } -static int32_t rewriteExprsForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { +static int32_t cloneRewriteExprs(SNodeList* pExprs, bool* pOutputs, SNodeList** pRewriteExpr) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t index = 0; + SNode* pExpr = NULL; + FOREACH(pExpr, pExprs) { + if (pOutputs[index]) { + code = nodesListMakeStrictAppend(pRewriteExpr, nodesCloneNode(pExpr)); + if (TSDB_CODE_SUCCESS != code) { + NODES_DESTORY_LIST(*pRewriteExpr); + break; + } + } + } + return code; +} + +static int32_t rewriteExprsForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause, + SNodeList** pRewriteExprs) { nodesWalkExprs(pExprs, doNameExpr, NULL); - SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs}; + SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL}; + if (NULL != pRewriteExprs) { + cxt.pOutputs = taosMemoryCalloc(LIST_LENGTH(pExprs), sizeof(bool)); + if (NULL == cxt.pOutputs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); + if (TSDB_CODE_SUCCESS == cxt.errCode && NULL != pRewriteExprs) { + cxt.errCode = cloneRewriteExprs(pExprs, cxt.pOutputs, pRewriteExprs); + } + taosMemoryFree(cxt.pOutputs); return cxt.errCode; } static int32_t rewriteExpr(SNodeList* pExprs, SNode** pTarget) { nodesWalkExprs(pExprs, doNameExpr, NULL); - SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs}; + SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL}; nodesRewriteExpr(pTarget, doRewriteExpr, &cxt); return cxt.errCode; } static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) { nodesWalkExprs(pExprs, doNameExpr, NULL); - SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs}; + SRewriteExprCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs, .pOutputs = NULL}; nodesRewriteExprs(pTarget, doRewriteExpr, &cxt); return cxt.errCode; } @@ -311,7 +358,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); + code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM, NULL); } pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan); @@ -509,23 +556,20 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { - code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); + code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY, NULL); } if (NULL != pSelect->pGroupByList) { - if (NULL != pAgg->pGroupKeys) { - code = nodesListStrictAppendList(pAgg->pGroupKeys, nodesCloneList(pSelect->pGroupByList)); - } else { - pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList); - if (NULL == pAgg->pGroupKeys) { - code = TSDB_CODE_OUT_OF_MEMORY; - } + pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList); + if (NULL == pAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; } } // rewrite the expression in subsequent clauses + SNodeList* pOutputGroupKeys = NULL; if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); + code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY, &pOutputGroupKeys); } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { @@ -536,9 +580,11 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, } // set the output - if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) { - code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets); + if (TSDB_CODE_SUCCESS == code && NULL != pOutputGroupKeys) { + code = createColumnByRewriteExprs(pOutputGroupKeys, &pAgg->node.pTargets); } + nodesDestroyList(pOutputGroupKeys); + if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets); } @@ -574,7 +620,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt // indefinite rows functions and _select_values functions int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs); if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pIdfRowsFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); + code = rewriteExprsForSelect(pIdfRowsFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL); } // set the output @@ -612,7 +658,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p // interp functions and _group_key functions int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, isInterpFunc, &pInterpFunc->pFuncs); if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); + code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL); } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) { @@ -656,7 +702,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); + code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW, NULL); } if (TSDB_CODE_SUCCESS == code) { @@ -854,10 +900,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect int32_t code = partFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs); if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL); + code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL, NULL); } if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL); + code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL, NULL); } if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets); @@ -1066,7 +1112,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { - code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT); + code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT, NULL); } // set the output diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 45fa67faef79bcc167b12776fb496dcaef59a1ff..b8b6e444129cf92864cfcfef64e44f845027e0b2 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1476,19 +1476,33 @@ static bool partTagsHasIndefRowsSelectFunc(SNodeList* pFuncs) { return false; } -static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SNodeList* pAggFuncs) { - bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAggFuncs); +static bool partTagsNeedOutput(SNode* pExpr, SNodeList* pTargets) { + SNode* pOutput = NULL; + FOREACH(pOutput, pTargets) { + if (QUERY_NODE_COLUMN == nodeType(pExpr)) { + if (nodesEqualNode(pExpr, pOutput)) { + return true; + } + } else if (0 == strcmp(((SExprNode*)pExpr)->aliasName, ((SColumnNode*)pOutput)->colName)) { + return true; + } + } + return false; +} + +static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SAggLogicNode* pAgg) { + bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAgg->pAggFuncs); int32_t code = TSDB_CODE_SUCCESS; int32_t index = 0; SNode* pNode = NULL; FOREACH(pNode, pGroupTags) { - if (index++ < start) { + if (index++ < start || !partTagsNeedOutput(pNode, pAgg->node.pTargets)) { continue; } if (hasIndefRowsSelectFunc) { - code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode)); + code = nodesListStrictAppend(pAgg->pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode)); } else { - code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_group_key", pNode)); + code = nodesListStrictAppend(pAgg->pAggFuncs, partTagsCreateWrapperFunc("_group_key", pNode)); } if (TSDB_CODE_SUCCESS != code) { break; @@ -1541,7 +1555,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub } NODES_DESTORY_LIST(pAgg->pGroupKeys); if (TSDB_CODE_SUCCESS == code && start >= 0) { - code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg->pAggFuncs); + code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg); } } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 72f3d995bc9bf7edfd5cc9b585bd24f3069342c4..379bfe90c8c3e4a65beee8d1d66dce1aebcc325f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1064,6 +1064,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId; pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId; + pExchange->seqRecvData = pExchangeLogicNode->seqRecvData; *pPhyNode = (SPhysiNode*)pExchange; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 489046d88e73219fda229b6a2329096fd55d8f8f..bcf4b40e698ad8c6f9fde793af5eb538e44d9b6d 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -292,6 +292,43 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) { return true; } +static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + return ((SScanLogicNode*)pNode)->pGroupTags; + } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { + return ((SPartitionLogicNode*)pNode)->pPartitionKeys; + } else { + return NULL; + } +} + +static bool stbSplHasPartTbname(SNodeList* pPartKeys) { + if (NULL == pPartKeys) { + return false; + } + SNode* pPartKey = NULL; + FOREACH(pPartKey, pPartKeys) { + if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) { + pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0); + } + if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) || + (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) { + return true; + } + } + return false; +} + +static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) { + if (NULL != pAgg->pGroupKeys) { + return stbSplHasPartTbname(pAgg->pGroupKeys); + } + if (1 != LIST_LENGTH(pAgg->node.pChildren)) { + return false; + } + return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); +} + static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: @@ -301,7 +338,9 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { case QUERY_NODE_LOGIC_PLAN_PARTITION: return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: - return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); + return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) || + stbSplIsPartTableAgg((SAggLogicNode*)pNode)) && + stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: return stbSplNeedSplitWindow(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SORT: @@ -676,27 +715,8 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) { } } -static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { - return ((SScanLogicNode*)pNode)->pGroupTags; - } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { - return ((SPartitionLogicNode*)pNode)->pPartitionKeys; - } else { - return NULL; - } -} - -static bool stbSplIsPartTbanme(SNodeList* pPartKeys) { - if (NULL == pPartKeys || 1 != LIST_LENGTH(pPartKeys)) { - return false; - } - SNode* pPartKey = nodesListGetNode(pPartKeys, 0); - return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) || - (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType); -} - static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) { - return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); + return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); } static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { @@ -713,6 +733,17 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI return TSDB_CODE_PLAN_INTERNAL_ERROR; } +static bool stbSplNeedSeqRecvData(SLogicNode* pNode) { + if (NULL == pNode) { + return false; + } + + if (NULL != pNode->pLimit || NULL != pNode->pSlimit) { + return true; + } + return stbSplNeedSeqRecvData(pNode->pParent); +} + static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { if (pCxt->pPlanCxt->streamQuery) { SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); @@ -728,6 +759,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange); } if (TSDB_CODE_SUCCESS == code) { + pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange); code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); } @@ -797,7 +829,17 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO return code; } -static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { +static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); + } + ++(pCxt->groupId); + return code; +} + +static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartAgg = NULL; int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); if (TSDB_CODE_SUCCESS == code) { @@ -812,6 +854,13 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + if (stbSplIsPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) { + return stbSplSplitAggNodeForPartTable(pCxt, pInfo); + } + return stbSplSplitAggNodeForCrossTable(pCxt, pInfo); +} + static SNode* stbSplCreateColumnNode(SExprNode* pExpr) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 1e72b4eb261ff1f7137df85ccddb0af8fe17e030..aa8d3bef517908f402667629bfa023c2045e18d1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -232,7 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); -bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode); +bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index acaf499cbbfa90a81814a4ceb8e0e3d8d546b784..7ceec29be4d0c74b6b86b95e5f5a64ee9cc83fc2 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -124,6 +124,7 @@ typedef struct SyncHeartbeat { SyncIndex commitIndex; SyncTerm privateTerm; SyncTerm minMatchIndex; + int64_t timeStamp; } SyncHeartbeat; typedef struct SyncHeartbeatReply { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 008e49c20c039eacc07d059fadbffae7b25c5760..2aaa13f95d529447bdaf3542c1eae8b2b91d9e7b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -640,7 +640,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } // heartbeat timeout - if (syncNodeHeartbeatTimeout(pSyncNode)) { + if (syncNodeHeartbeatReplyTimeout(pSyncNode)) { terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); @@ -2039,6 +2039,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; + pSyncMsg->timeStamp = taosGetTimestampMs(); // send msg syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); @@ -2094,7 +2095,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand return code; } -bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) { +bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) { if (pSyncNode->replicaNum == 1) { return false; } @@ -2148,7 +2149,11 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeat* pMsg = pRpcMsg->pCont; - syncLogRecvHeartbeat(ths, pMsg, ""); + + int64_t tsMs = taosGetTimestampMs(); + char buf[128]; + snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); + syncLogRecvHeartbeat(ths, pMsg, buf); SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); @@ -2161,6 +2166,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->timeStamp = taosGetTimestampMs(); if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); + syncNodeResetElectTimer(ths); ths->minMatchIndex = pMsg->minMatchIndex; @@ -2220,9 +2227,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsg = pRpcMsg->pCont; - syncLogRecvHeartbeatReply(ths, pMsg, ""); int64_t tsMs = taosGetTimestampMs(); + char buf[128]; + snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); + syncLogRecvHeartbeatReply(ths, pMsg, buf); // update last reply time, make decision whether the other node is alive or not syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); @@ -2500,6 +2509,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde SRpcMsg rpcMsg = {0}; syncEntry2OriginalRpc(pEntry, &rpcMsg); + sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType)); + // user commit if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) { bool internalExecute = true; @@ -2507,7 +2518,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde internalExecute = false; } - sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute); + sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute, + TMSG_INFO(pEntry->msgType)); // execute fsm in apply thread, or execute outside syncPropose if (internalExecute) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index de5f71e5a99222ed433ffaa192766c5e8b4fdac4..54c29febe5624a8be0068ef1df635a0bee01ed73 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -213,9 +213,11 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcM } int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { + int64_t ts = taosGetTimestampMs(); for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SRpcMsg rpcMsg = {0}; if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) { + sError("vgId:%d, build sync-heartbeat error", pSyncNode->vgId); continue; } @@ -226,6 +228,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; + pSyncMsg->timeStamp = ts; // send msg syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 4c847862865fbd4be754a9a108924a7967fe2f17..1e5a268e9768060bc0fcddfdcae6f887f1610924 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncUtil.h" +#include "syncIndexMgr.h" #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" @@ -175,6 +176,36 @@ void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { } } +// for leader +static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + int32_t len = 5; + + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i])); + + if (i < pSyncNode->replicaNum - 1) { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); + } else { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + } + } +} + +// for follower +static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + int32_t len = 4; + + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i])); + + if (i < pSyncNode->replicaNum - 1) { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); + } else { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + } + } +} + static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { int32_t len = 1; @@ -221,6 +252,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo char peerStr[1024] = "{"; syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); + char hbrTimeStr[256] = "hbr:{"; + syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr)); + + char hbTimeStr[256] = "hb:{"; + syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr)); + int32_t quorum = syncNodeDynamicQuorum(pNode); char eventLog[512]; // {0}; @@ -243,12 +280,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo "%s" ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64 - ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", + ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, + hbrTimeStr); } } @@ -395,9 +433,8 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); } void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { @@ -406,9 +443,9 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 + "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); } void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { @@ -416,8 +453,8 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->privateTerm, s); + sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->timeStamp, s); } void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 27611a1a176cabda2c60b0bcd95e0c24de6fa73a..079dbd8d02e03d210609acbf6ede2a61197f7b9d 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -417,8 +417,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py -#,,,system-test,python3 ./test.py -f 1-insert/alter_database.py -,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py +,,,system-test,python3 ./test.py -f 1-insert/alter_database.py +,,,system-test,python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py ,,,system-test,python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py ,,,system-test,python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py @@ -436,7 +436,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_param_ttl.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data_muti_rows.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/db_tb_name_check.py -,,,system-test,python3 ./test.py -f 1-insert/database_pre_suf.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/database_pre_suf.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/InsertFuturets.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py @@ -561,8 +561,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sin.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sin.py -R -,,,system-test,python3 ./test.py -f 2-query/smaTest.py -,,,system-test,python3 ./test.py -f 2-query/smaTest.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py @@ -595,8 +595,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Today.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -R -,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/twa.py @@ -617,8 +617,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_childtable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_normaltable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/keep_expired.py -,,,system-test,python3 ./test.py -f 1-insert/drop.py -,,,system-test,python3 ./test.py -f 1-insert/drop.py -N 3 -M 3 -i False -n 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/drop.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/drop.py -N 3 -M 3 -i False -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/union1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py @@ -820,7 +820,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 2 -,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 3 @@ -913,7 +913,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3 -,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3 diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 22e612797328440812d22b9795c6ad74f7e3b5c0..ee091303687b7a31f2c139d2f9d79a5fae52389f 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -312,8 +312,14 @@ class TDDnode: cmd = "mintty -h never %s -c %s" % ( binPath, self.cfgDir) else: - cmd = "nohup %s -c %s > /dev/null 2>&1 & " % ( - binPath, self.cfgDir) + if self.asan: + asanDir = "%s/sim/asan/dnode%d.asan" % ( + self.path, self.index) + cmd = "nohup %s -c %s > /dev/null 2> %s & " % ( + binPath, self.cfgDir, asanDir) + else: + cmd = "nohup %s -c %s > /dev/null 2>&1 & " % ( + binPath, self.cfgDir) else: valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\" --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir @@ -748,7 +754,7 @@ class TDDnodes: tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) def StopAllSigint(self): - tdLog.info("stop all dnodes sigint") + tdLog.info("stop all dnodes sigint, asan:%d" % self.asan) if self.asan: tdLog.info("execute script: %s" % self.stopDnodesSigintPath) os.system(self.stopDnodesSigintPath) @@ -756,7 +762,7 @@ class TDDnodes: return def stopAll(self): - tdLog.info("stop all dnodes") + tdLog.info("stop all dnodes, asan:%d" % self.asan) if self.asan: tdLog.info("execute script: %s" % self.stopDnodesPath) os.system(self.stopDnodesPath) diff --git a/tests/system-test/2-query/smaTest.py b/tests/system-test/2-query/smaTest.py index 0390bae114306929605f404a0f8b1bb1eca10bec..04fb893e75824534d3702dbf169b6531dd30d342 100644 --- a/tests/system-test/2-query/smaTest.py +++ b/tests/system-test/2-query/smaTest.py @@ -44,8 +44,8 @@ class TDTestCase: def run(self): # insert data dbname = "db" - self.insert_data1(f"{dbname}.t1", self.ts, 1000*10000) - self.insert_data1(f"{dbname}.t4", self.ts, 1000*10000) + self.insert_data1(f"{dbname}.t1", self.ts, 10*10000) + self.insert_data1(f"{dbname}.t4", self.ts, 10*10000) # test base case # self.test_case1() tdLog.debug(" LIMIT test_case1 ............ [OK]") @@ -53,7 +53,6 @@ class TDTestCase: # self.test_case2() tdLog.debug(" LIMIT test_case2 ............ [OK]") - # stop def stop(self): tdSql.close() @@ -77,15 +76,17 @@ class TDTestCase: # insert data1 def insert_data(self, tbname, ts_start, count): - pre_insert = "insert into %s values"%tbname + pre_insert = "insert into %s values" % tbname sql = pre_insert - tdLog.debug("insert table %s rows=%d ..."%(tbname, count)) + tdLog.debug("insert table %s rows=%d ..." % (tbname, count)) for i in range(count): - sql += " (%d,%d)"%(ts_start + i*1000, i ) - if i >0 and i%30000 == 0: + sql += " (%d,%d)" % (ts_start + i*1000, i) + if i > 0 and i % 20000 == 0: + tdLog.info("%d rows inserted" % i) tdSql.execute(sql) sql = pre_insert # end sql + tdLog.info("insert_data end") if sql != pre_insert: tdSql.execute(sql) @@ -93,15 +94,17 @@ class TDTestCase: return def insert_data1(self, tbname, ts_start, count): - pre_insert = "insert into %s values"%tbname + pre_insert = "insert into %s values" % tbname sql = pre_insert - tdLog.debug("insert table %s rows=%d ..."%(tbname, count)) + tdLog.debug("insert table %s rows=%d ..." % (tbname, count)) for i in range(count): - sql += " (%d,%d,%d)"%(ts_start + i*1000, i , i+1) - if i >0 and i%30000 == 0: + sql += " (%d,%d,%d)" % (ts_start + i*1000, i, i+1) + if i > 0 and i % 20000 == 0: + tdLog.info("%d rows inserted" % i) tdSql.execute(sql) sql = pre_insert # end sql + tdLog.info("insert_data1 end") if sql != pre_insert: tdSql.execute(sql) diff --git a/tests/system-test/test.py b/tests/system-test/test.py index cf9aba123cbc2b902dd13152a24cafd53bdbb03d..2017aad1ca7c980b2040d90e389e0101d8fd734e 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -464,6 +464,7 @@ if __name__ == "__main__": tdDnodes.init(deployPath, masterIp) tdDnodes.setTestCluster(testCluster) tdDnodes.setValgrind(valgrind) + tdDnodes.setAsan(asan) tdDnodes.stopAll() for dnode in tdDnodes.dnodes: tdDnodes.deploy(dnode.index,{}) diff --git a/tools/shell/inc/shellAuto.h b/tools/shell/inc/shellAuto.h index f86090d61886d10566609694f895f14919da46e5..b7bf5fa1019502acbeaefc8884d4553704f58702 100644 --- a/tools/shell/inc/shellAuto.h +++ b/tools/shell/inc/shellAuto.h @@ -39,4 +39,7 @@ void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb); // introduction void printfIntroduction(); +// show all commands help +void showHelp(); + #endif diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 534ecf3c4dbf097311f43612c650c6308fee05d1..01c8042c0e6f323eac50a5433de12abad1b99f59 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -108,6 +108,7 @@ SWords shellCommands[] = { {"drop topic ;", 0, 0, NULL}, {"drop stream ;", 0, 0, NULL}, {"explain select", 0, 0, NULL}, // 44 append sub sql + {"help;", 0, 0, NULL}, {"grant all on to ;", 0, 0, NULL}, {"grant read on to ;", 0, 0, NULL}, {"grant write on to ;", 0, 0, NULL}, @@ -386,6 +387,8 @@ void showHelp() { drop stream ;\n\ ----- E ----- \n\ explain select clause ...\n\ + ----- H ----- \n\ + help;\n\ ----- I ----- \n\ insert into values(...) ;\n\ insert into using tags(...) values(...) ;\n\ @@ -1478,24 +1481,36 @@ bool matchSelectQuery(TAOS* con, SShellCmd* cmd) { // if is input create fields or tags area, return true bool isCreateFieldsArea(char* p) { - char* left = strrchr(p, '('); - if (left == NULL) { - // like 'create table st' - return false; - } + // put to while, support like create table st(ts timestamp, bin1 binary(16), bin2 + blank + TAB + char* p1 = strdup(p); + bool ret = false; + while (1) { + char* left = strrchr(p1, '('); + if (left == NULL) { + // like 'create table st' + ret = false; + break; + } - char* right = strrchr(p, ')'); - if (right == NULL) { - // like 'create table st( ' - return true; - } + char* right = strrchr(p1, ')'); + if (right == NULL) { + // like 'create table st( ' + ret = true; + break; + } - if (left > right) { - // like 'create table st( ts timestamp, age int) tags(area ' - return true; + if (left > right) { + // like 'create table st( ts timestamp, age int) tags(area ' + ret = true; + break; + } + + // set string end by small for next strrchr search + *left = 0; } + taosMemoryFree(p1); - return false; + return ret; } bool matchCreateTable(TAOS* con, SShellCmd* cmd) { diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 8402a5a589b1ac4f766e97fefdd134777cf6daec..577021f460c5f0b859b5bed83e1d096c650b1709 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -134,6 +134,12 @@ int32_t shellRunCommand(char *command, bool recordHistory) { return 0; } + // add help or help; + if(strcmp(command, "help") == 0 || strcmp(command, "help;") == 0) { + showHelp(); + return 0; + } + if (recordHistory) shellRecordCommandToHistory(command); char quote = 0, *cmd = command;