diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 898db47e865762de2eb26904c6b3b0a57129e6da..e67423ca6e40c6f8eecbf6d0c67079b87a094851 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -47,7 +47,6 @@ enum { TASK_STATUS__WAIT_DOWNSTREAM, TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER1, - TASK_STATUS__RECOVER2, TASK_STATUS__PAUSE, }; @@ -203,7 +202,7 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } -void* streamQueueNextItem(SStreamQueue* queue); +void* streamQueueNextItem(SStreamQueue* pQueue); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); @@ -251,7 +250,7 @@ typedef struct { int8_t reserved; } STaskSinkFetch; -typedef struct { +typedef struct SStreamChildEpInfo { int32_t nodeId; int32_t childId; int32_t taskId; @@ -276,32 +275,38 @@ typedef struct SStreamStatus { int8_t keepTaskStatus; } SStreamStatus; -typedef struct SHistoryDataRange { +typedef struct SHistDataRange { SVersionRange range; STimeWindow window; -} SHistoryDataRange; +} SHistDataRange; -struct SStreamTask { - SStreamId id; +typedef struct SSTaskBasicInfo { + int32_t nodeId; // vgroup id or snode id + SEpSet epSet; + int32_t selfChildId; int32_t totalLevel; int8_t taskLevel; - int8_t outputType; - int16_t dispatchMsgType; - SStreamStatus status; - int32_t selfChildId; - int32_t nodeId; // vgroup id - SEpSet epSet; - SCheckpointInfo chkInfo; - STaskExec exec; - int8_t fillHistory; // fill history + int8_t fillHistory; // is fill history task or not +} SSTaskBasicInfo; - SHistoryDataRange dataRange; - SStreamId historyTaskId; +typedef struct SDispatchMsgInfo { + void* pData; // current dispatch data + int16_t msgType; // dispatch msg type +} SDispatchMsgInfo; - // children info - SArray* childEpInfo; // SArray - int32_t nextCheckId; - SArray* checkpointInfo; // SArray +struct SStreamTask { + SStreamId id; + SSTaskBasicInfo info; + int8_t outputType; + SDispatchMsgInfo msgInfo; + SStreamStatus status; + SCheckpointInfo chkInfo; + STaskExec exec; + SHistDataRange dataRange; + SStreamId historyTaskId; + SArray* pUpstreamEpInfoList; // SArray, // children info + int32_t nextCheckId; + SArray* checkpointInfo; // SArray // output union { @@ -326,7 +331,7 @@ struct SStreamTask { // the followings attributes don't be serialized int32_t recoverTryingDownstream; - int32_t recoverWaitingUpstream; + int32_t numOfWaitingUpstream; int64_t checkReqId; SArray* checkReqIds; // shuffle int32_t refCnt; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a1c81a999f8dfc4e9df16270ad4a5bebcc2608f8..c6d8bb5ffeffc261469608ee25e66928e4805702 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -110,7 +110,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr isShuffle = true; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { return -1; } @@ -131,7 +131,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr for (int32_t j = 0; j < numOfSinkNodes; j++) { SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j); - if (pSinkTask->nodeId == pVgInfo->vgId) { + if (pSinkTask->info.nodeId == pVgInfo->vgId) { pVgInfo->taskId = pSinkTask->id.taskId; break; } @@ -148,11 +148,11 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t msgLen; - pTask->nodeId = pVgroup->vgId; - pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); + pTask->info.nodeId = pVgroup->vgId; + pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); - plan->execNode.nodeId = pTask->nodeId; - plan->execNode.epSet = pTask->epSet; + plan->execNode.nodeId = pTask->info.nodeId; + plan->execNode.epSet = pTask->info.epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; @@ -172,11 +172,11 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) { int32_t msgLen; - pTask->nodeId = SNODE_HANDLE; - pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode); + pTask->info.nodeId = SNODE_HANDLE; + pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode); plan->execNode.nodeId = SNODE_HANDLE; - plan->execNode.epSet = pTask->epSet; + plan->execNode.epSet = pTask->info.epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -232,8 +232,8 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p return -1; } - pTask->nodeId = vgId; - pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); + pTask->info.nodeId = vgId; + pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); mndSetSinkTaskInfo(pStream, pTask); return 0; } @@ -273,9 +273,9 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { return NULL; } - pEpInfo->childId = pTask->selfChildId; - pEpInfo->epSet = pTask->epSet; - pEpInfo->nodeId = pTask->nodeId; + pEpInfo->childId = pTask->info.selfChildId; + pEpInfo->epSet = pTask->info.epSet; + pEpInfo->nodeId = pTask->info.nodeId; pEpInfo->taskId = pTask->id.taskId; return pEpInfo; @@ -284,11 +284,11 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher; pDispatcher->taskId = pTask->id.taskId; - pDispatcher->nodeId = pTask->nodeId; - pDispatcher->epSet = pTask->epSet; + pDispatcher->nodeId = pTask->info.nodeId; + pDispatcher->epSet = pTask->info.epSet; pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; - pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { @@ -297,11 +297,11 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { return TSDB_CODE_OUT_OF_MEMORY; } - if(pDownstream->childEpInfo == NULL) { - pDownstream->childEpInfo = taosArrayInit(4, POINTER_BYTES); + if(pDownstream->pUpstreamEpInfoList == NULL) { + pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES); } - taosArrayPush(pDownstream->childEpInfo, &pEpInfo); + taosArrayPush(pDownstream->pUpstreamEpInfoList, &pEpInfo); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d4c1e033dcb0c03bfc52c9d3d578423f61b0ffe4..74138aab5ef655f003a0926f50b62a942f8c2780 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -444,7 +444,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { return -1; } - ((SMsgHead *)buf)->vgId = htonl(pTask->nodeId); + ((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, abuf, size); @@ -454,7 +454,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { STransAction action = {0}; action.mTraceId = pTrans->mTraceId; - memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = buf; action.contLen = tlen; action.msgType = TDMT_STREAM_TASK_DEPLOY; @@ -637,17 +637,17 @@ _OVER: static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { // vnode - /*if (pTask->nodeId > 0) {*/ + /*if (pTask->info.nodeId > 0) {*/ SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pReq->head.vgId = htonl(pTask->nodeId); + pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; STransAction action = {0}; - memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; action.contLen = sizeof(SVDropStreamTaskReq); action.msgType = TDMT_STREAM_TASK_DROP; @@ -870,7 +870,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con SMStreamDoCheckpointMsg *pMsg) { SStreamCheckpointSourceReq req = {0}; req.checkpointId = pMsg->checkpointId; - req.nodeId = pTask->nodeId; + req.nodeId = pTask->info.nodeId; req.expireTime = -1; req.streamId = pTask->streamId; req.taskId = pTask->taskId; @@ -899,7 +899,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(pTask->nodeId); + pMsgHead->vgId = htonl(pTask->info.nodeId); tEncoderClear(&encoder); @@ -938,12 +938,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { for (int32_t i = 0; i < totLevel; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - /*A(pTask->nodeId > 0);*/ - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId); + /*A(pTask->info.nodeId > 0);*/ + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); if (pVgObj == NULL) { taosRUnLockLatch(&pStream->lock); mndReleaseStream(pMnode, pStream); @@ -1262,7 +1262,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; varDataSetLen(nodeType, 5); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - if (pTask->nodeId > 0) { + if (pTask->info.nodeId > 0) { memcpy(varDataVal(nodeType), "vnode", 5); } else { memcpy(varDataVal(nodeType), "snode", 5); @@ -1271,21 +1271,21 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // node id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - int64_t nodeId = TMAX(pTask->nodeId, 0); + int64_t nodeId = TMAX(pTask->info.nodeId, 0); colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false); // level char level[20 + VARSTR_HEADER_SIZE] = {0}; - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { memcpy(varDataVal(level), "source", 6); varDataSetLen(level, 6); - } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { memcpy(varDataVal(level), "agg", 3); varDataSetLen(level, 3); - } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { memcpy(varDataVal(level), "sink", 4); varDataSetLen(level, 4); - } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&level, false); @@ -1323,10 +1323,10 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pReq->head.vgId = htonl(pTask->nodeId); + pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; STransAction action = {0}; - memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; action.contLen = sizeof(SVPauseStreamTaskReq); action.msgType = TDMT_STREAM_TASK_PAUSE; @@ -1344,7 +1344,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) { + if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) { return -1; } } @@ -1446,11 +1446,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pReq->head.vgId = htonl(pTask->nodeId); + pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; pReq->igUntreated = igUntreated; STransAction action = {0}; - memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); action.pCont = pReq; action.contLen = sizeof(SVResumeStreamTaskReq); action.msgType = TDMT_STREAM_TASK_RESUME; @@ -1468,7 +1468,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 678dd34e4a6e12ba3573f4d8103e10bcba754119..cf1481a113d0b74d3fc7ec7e336f67baa686e663 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -62,7 +62,7 @@ FAIL: } int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { - ASSERT(pTask->taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->childEpInfo) != 0); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -85,7 +85,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } - int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo); + int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList); SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState }; initStreamStateAPI(&handle.api); @@ -151,7 +151,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { } tDecoderClear(&decoder); - ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG); // 2.save task taosWLockLatch(&pSnode->pMeta->lock); @@ -164,7 +164,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { taosWUnLockLatch(&pSnode->pMeta->lock); // 3.go through recover steps to fill history - if (pTask->fillHistory) { + if (pTask->info.fillHistory) { streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 84e37649ac4835b4aafd055f79d2b3b4ad31b34c..da05e950ce1323d083099eb128004bf80784ca07 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -820,9 +820,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->dataRange.range.minVer = ver; // expand executor - pTask->status.taskStatus = /*(pTask->fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/; + pTask->status.taskStatus = /*(pTask->info.fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/; - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { return -1; @@ -837,13 +837,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); - } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { return -1; } - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo); + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState}; initStorageAPI(&handle.api); @@ -879,7 +879,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr); } - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } @@ -887,7 +887,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, - pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel); // next valid version will add one pTask->chkInfo.version += 1; @@ -1028,7 +1028,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWUnLockLatch(&pStreamMeta->lock); // 3. It's an fill history task, do nothing. wait for the main task to start it - if (pTask->fillHistory) { + if (pTask->info.fillHistory) { tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); } else { // calculate the correct start time window, and start the handle the history data for the main task. @@ -1037,7 +1037,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamTaskStartHistoryTask(pTask, sversion); // launch current task - SHistoryDataRange* pRange = &pTask->dataRange; + SHistDataRange* pRange = &pTask->dataRange; int64_t ekey = pRange->window.ekey; int64_t ver = pRange->range.minVer; @@ -1093,7 +1093,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); - if (pTask->fillHistory) { + if (pTask->info.fillHistory) { // todo transfer the executor status, and then destroy this stream task } else { // todo update the chkInfo version for current task. @@ -1199,7 +1199,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - atomic_store_8(&pTask->fillHistory, 0); + atomic_store_8(&pTask->info.fillHistory, 0); streamMetaSaveTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1392,7 +1392,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version - if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended. walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 @@ -1403,7 +1403,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { tqStartStreamTasks(pTq); } else { streamSchedExec(pTask); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 155ef92ae5025d6b6e7a42e7e95d87078d186225..e5622647118017b573d8ced380dfdfa7ea47b6dd 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1109,7 +1109,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd); if (code != 0) { tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index fe80f486918413390ee7916fb97fe07c58a1b80d..cafb64e44ad8c2599b60fc8f8e0c9684bda9deca 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -129,8 +129,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t status = pTask->status.taskStatus; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { -// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { +// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->info.taskLevel); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt index 8ba8b79ab80430395131ad10d7c7912dc17879c2..7a984cd000caf314f59e74baad09d898800bf19d 100644 --- a/source/libs/qworker/CMakeLists.txt +++ b/source/libs/qworker/CMakeLists.txt @@ -7,15 +7,9 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -IF (TD_GRANT) - TARGET_LINK_LIBRARIES(qworker - PRIVATE os util transport nodes planner qcom executor index grant - ) -ELSE () - TARGET_LINK_LIBRARIES(qworker - PRIVATE os util transport nodes planner qcom executor index - ) -ENDIF() +TARGET_LINK_LIBRARIES(qworker + PRIVATE os util transport nodes planner qcom executor index + ) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index e12a0fdd435689958b86972ac1c0a2c3d4ed8818..3ed7bc8f7e3d9be0d4d753d632a995e3ec70aff7 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -40,8 +40,6 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); -int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); - int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7d0a44d2b82bcd3e632cab804c5ff4f80a2b4414..bdf7358f9073e4a52ade1025e2b4810e0a834719 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -110,7 +110,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { return -1; } - pRunReq->head.vgId = pTask->nodeId; + pRunReq->head.vgId = pTask->info.nodeId; pRunReq->streamId = pTask->id.streamId; pRunReq->taskId = pTask->id.taskId; @@ -146,7 +146,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR pDispatchRsp->streamId = htobe64(pReq->streamId); pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); - pDispatchRsp->downstreamNodeId = htonl(pTask->nodeId); + pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); pRsp->pCont = buf; @@ -162,7 +162,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId, + qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->info.selfChildId, pReq->srcTaskId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; @@ -278,10 +278,10 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - qDebug("s-task:%s receive retrieve req from node %d taskId:0x%x", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); + qDebug("s-task:%s receive retrieve req from taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->srcTaskId, pReq->srcNodeId); streamTaskEnqueueRetrieve(pTask, pReq, pRsp); - ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); + ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK); streamSchedExec(pTask); return 0; } @@ -299,7 +299,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); @@ -319,7 +319,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); @@ -350,19 +350,21 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } -void* streamQueueNextItem(SStreamQueue* queue) { - int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); - if (dequeueFlag == STREAM_QUEUE__FAILED) { - ASSERT(queue->qItem != NULL); - return streamQueueCurItem(queue); +void* streamQueueNextItem(SStreamQueue* pQueue) { + int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); + + if (flag == STREAM_QUEUE__FAILED) { + ASSERT(pQueue->qItem != NULL); + return streamQueueCurItem(pQueue); } else { - queue->qItem = NULL; - taosGetQitem(queue->qall, &queue->qItem); - if (queue->qItem == NULL) { - taosReadAllQitems(queue->queue, queue->qall); - taosGetQitem(queue->qall, &queue->qItem); + pQueue->qItem = NULL; + taosGetQitem(pQueue->qall, &pQueue->qItem); + if (pQueue->qItem == NULL) { + taosReadAllQitems(pQueue->queue, pQueue->qall); + taosGetQitem(pQueue->qall, &pQueue->qItem); } - return streamQueueCurItem(queue); + + return streamQueueCurItem(pQueue); } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 670cfbead1e180061fe0f972290351125eb9852c..722c557b8f1e2b6b44bf851454f60ed6ca14ad23 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -123,7 +123,7 @@ int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pR static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) { if (pTask->checkpointingId == 0) { pTask->checkpointingId = checkpointId; - pTask->checkpointAlignCnt = taosArrayGetSize(pTask->childEpInfo); + pTask->checkpointAlignCnt = taosArrayGetSize(pTask->pUpstreamEpInfoList); } ASSERT(pTask->checkpointingId == checkpointId); @@ -165,7 +165,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId; - if (taosArrayGetSize(pTask->childEpInfo) > 0) { + if (taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0) { code = streamAlignCheckpoint(pTask, checkpointId, childId); if (code > 0) { return 0; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 7c06e7deb30501368b3588cf0906841fd8afaf54..84b5eb3ab70d646c60955d4af4e43c3431fbb5ac 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -64,11 +64,11 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; - pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->childId = pTask->info.selfChildId; pStreamBlocks->sourceVer = pSubmit->ver; } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; - pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->childId = pTask->info.selfChildId; pStreamBlocks->sourceVer = pMerged->ver; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 26dd19ce7ecc1ada962434c5c22fc976a7cefd86..25ce470b3390b74af77c2edc04eddb4da25c5543 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -15,7 +15,9 @@ #include "streamInc.h" -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { +static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); + +static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; @@ -37,6 +39,37 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p return pEncoder->pos; } +static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + if (buf == NULL) return -1; + + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = 0; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->streamBlockType = pBlock->info.type; + pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); + pRetrieve->skey = htobe64(pBlock->info.window.skey); + pRetrieve->ekey = htobe64(pBlock->info.window.ekey); + pRetrieve->version = htobe64(pBlock->info.version); + pRetrieve->watermark = htobe64(pBlock->info.watermark); + memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + + int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); + pRetrieve->numOfCols = htonl(numOfCols); + + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(pReq->dataLen, &actualLen); + taosArrayPush(pReq->data, &buf); + + pReq->totalLen += dataStrLen; + return 0; +} + int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; @@ -125,17 +158,17 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) SStreamRetrieveReq req = { .streamId = pTask->id.streamId, - .srcNodeId = pTask->nodeId, + .srcNodeId = pTask->info.nodeId, .srcTaskId = pTask->id.taskId, .pRetrieve = pRetrieve, .retrieveLen = dataStrLen, }; - int32_t sz = taosArrayGetSize(pTask->childEpInfo); + int32_t sz = taosArrayGetSize(pTask->pUpstreamEpInfoList); ASSERT(sz > 0); for (int32_t i = 0; i < sz; i++) { req.reqId = tGenIdPI64(); - SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); + SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); req.dstNodeId = pEpInfo->nodeId; req.dstTaskId = pEpInfo->taskId; int32_t len; @@ -165,7 +198,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) buf = NULL; qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, - pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); + pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); } code = 0; @@ -175,37 +208,6 @@ CLEAR: return code; } -static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); - void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) return -1; - - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = 0; - pRetrieve->precision = TSDB_DEFAULT_PRECISION; - pRetrieve->compressed = 0; - pRetrieve->completed = 1; - pRetrieve->streamBlockType = pBlock->info.type; - pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); - pRetrieve->skey = htobe64(pBlock->info.window.skey); - pRetrieve->ekey = htobe64(pBlock->info.window.ekey); - pRetrieve->version = htobe64(pBlock->info.version); - pRetrieve->watermark = htobe64(pBlock->info.watermark); - memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); - - int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); - pRetrieve->numOfCols = htonl(numOfCols); - - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); - actualLen += sizeof(SRetrieveTableRsp); - ASSERT(actualLen <= dataStrLen); - taosArrayPush(pReq->dataLen, &actualLen); - taosArrayPush(pReq->data, &buf); - - pReq->totalLen += dataStrLen; - return 0; -} - int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; @@ -315,7 +317,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in msg.contLen = tlen + sizeof(SMsgHead); msg.pCont = buf; - msg.msgType = pTask->dispatchMsgType; + msg.msgType = pTask->msgInfo.msgType; qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); @@ -383,12 +385,12 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat .streamId = pTask->id.streamId, .dataSrcVgId = pData->srcVgId, .upstreamTaskId = pTask->id.taskId, - .upstreamChildId = pTask->selfChildId, - .upstreamNodeId = pTask->nodeId, + .upstreamChildId = pTask->info.selfChildId, + .upstreamNodeId = pTask->info.nodeId, .blockNum = numOfBlocks, }; - req.data = taosArrayInit(numOfBlocks, sizeof(void*)); + req.data = taosArrayInit(numOfBlocks, POINTER_BYTES); req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); if (req.data == NULL || req.dataLen == NULL) { taosArrayDestroyP(req.data, taosMemoryFree); @@ -413,8 +415,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; - qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to down stream s-task:0x%x in vgId:%d", pTask->id.idStr, - pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId); + qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d)", pTask->id.idStr, + pTask->info.selfChildId, numOfBlocks, downstreamTaskId, vgId); code = doSendDispatchMsg(pTask, &req, vgId, pEpSet); taosArrayDestroyP(req.data, taosMemoryFree); @@ -436,8 +438,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat pReqs[i].streamId = pTask->id.streamId; pReqs[i].dataSrcVgId = pData->srcVgId; pReqs[i].upstreamTaskId = pTask->id.taskId; - pReqs[i].upstreamChildId = pTask->selfChildId; - pReqs[i].upstreamNodeId = pTask->nodeId; + pReqs[i].upstreamChildId = pTask->info.selfChildId; + pReqs[i].upstreamNodeId = pTask->info.nodeId; pReqs[i].blockNum = 0; pReqs[i].data = taosArrayInit(0, sizeof(void*)); pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t)); @@ -471,13 +473,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } } - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId, numOfBlocks, vgSz); for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { @@ -506,26 +508,26 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { numOfElems); } + // to make sure only one dispatch is running int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { - qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); + qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); return 0; } qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus); - SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); - if (pDispatchedBlock == NULL) { + SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); + if (pBlock == NULL) { atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr, - pTask->outputStatus); + qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputStatus); return 0; } - ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - int32_t code = streamDispatchAllBlocks(pTask, pDispatchedBlock); + int32_t code = streamDispatchAllBlocks(pTask, pBlock); if (code != TSDB_CODE_SUCCESS) { streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); @@ -533,6 +535,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } // this block can be freed only when it has been pushed to down stream. - destroyStreamDataBlock(pDispatchedBlock); + destroyStreamDataBlock(pBlock); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 46290c306f5b641c21309b2dcc9c97554c93e723..096ec25af315292708a9f8985da7d9dd90594ff5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -108,10 +108,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); block.info.type = STREAM_PULL_OVER; - block.info.childId = pTask->selfChildId; + block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); numOfBlocks += 1; - qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, + qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId, pRetrieveBlock->reqId); } @@ -127,7 +127,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i SSDataBlock block = {0}; assignOneDataBlock(&block, output); - block.info.childId = pTask->selfChildId; + block.info.childId = pTask->info.selfChildId; size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); numOfBlocks += 1; @@ -135,7 +135,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i taosArrayPush(pRes, &block); qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, - pTask->selfChildId, numOfBlocks, size / 1048576.0); + pTask->info.selfChildId, numOfBlocks, size / 1048576.0); // current output should be dispatched to down stream nodes if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) { @@ -164,7 +164,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t code = 0; - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); void* exec = pTask->exec.pExecutor; qSetStreamOpOpen(exec); @@ -200,7 +200,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { SSDataBlock block = {0}; assignOneDataBlock(&block, output); - block.info.childId = pTask->selfChildId; + block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); batchCnt++; @@ -275,7 +275,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { return -1; } - if (pTask->taskLevel == TASK_LEVEL__SINK) { + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem); } @@ -344,7 +344,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { - if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { times++; taosMsleep(10); qDebug("===stream===try again batchSize:%d", batchSize); @@ -358,7 +358,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput == NULL) { pInput = qItem; streamQueueProcessSuccess(pTask->inputQueue); - if (pTask->taskLevel == TASK_LEVEL__SINK) { + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { break; } } else { @@ -392,7 +392,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - if (pTask->taskLevel == TASK_LEVEL__SINK) { + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); @@ -400,7 +400,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } // wait for the task to be ready to go - while (pTask->taskLevel == TASK_LEVEL__SOURCE) { + while (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, @@ -423,7 +423,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ef3ab2ae468da5ae6ba50c2469a05489f6e33432..a4af4c30a153c321866d11c34d3b0d48d45eeaf1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -388,7 +388,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { // todo handle the fill history task ASSERT(0); - if (pTask->fillHistory) { + if (pTask->info.fillHistory) { ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); streamTaskCheckDownstreamTasks(pTask); } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0bcb078d51b10b2538fa5833fba236131058ba33..ec52e6fbe8e7344796d5ceb066ab8a3248771418 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -27,9 +27,9 @@ const char* streamGetTaskStatusStr(int32_t status) { } } int32_t streamTaskLaunchRecover(SStreamTask* pTask) { - qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->nodeId); + qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->info.nodeId); - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); SVersionRange* pRange = &pTask->dataRange.range; @@ -56,11 +56,11 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { /*ASSERT(0);*/ } - } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamSetStatusNormal(pTask); streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); - } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { streamSetStatusNormal(pTask); qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr); } @@ -77,8 +77,8 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->nodeId, - .childId = pTask->selfChildId, + .upstreamNodeId = pTask->info.nodeId, + .childId = pTask->info.selfChildId, }; // serialize @@ -89,7 +89,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -105,12 +105,12 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId); + qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->info.nodeId); streamTaskLaunchRecover(pTask); } @@ -128,7 +128,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .childId = pRsp->childId, }; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { @@ -229,7 +229,7 @@ int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerR } int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) { - pReq->msgHead.vgId = pTask->nodeId; + pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; pReq->taskId = pTask->id.taskId; return 0; @@ -240,7 +240,7 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { } int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) { - pReq->msgHead.vgId = pTask->nodeId; + pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; pReq->taskId = pTask->id.taskId; return 0; @@ -264,7 +264,7 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { } int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) { - SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId }; + SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { @@ -287,9 +287,9 @@ int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { - pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo); + pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete fill history procedure", pTask->id.idStr, - pTask->recoverWaitingUpstream); + pTask->numOfWaitingUpstream); return 0; } @@ -306,8 +306,8 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { } int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { - if (pTask->taskLevel == TASK_LEVEL__AGG) { - int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1); + if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left); ASSERT(left >= 0); if (left == 0) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index de10f021d25f899702d421722917bb79abaf78a7..7397c90f717b0a01434132b537c3b8f62c7528cd 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -19,7 +19,7 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); - pTask->selfChildId = childId; + pTask->info.selfChildId = childId; taosArrayPush(pArray, &pTask); return 0; } @@ -33,8 +33,8 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.taskId = tGenIdPI32(); pTask->id.streamId = streamId; - pTask->taskLevel = taskLevel; - pTask->fillHistory = fillHistory; + pTask->info.taskLevel = taskLevel; + pTask->info.fillHistory = fillHistory; pTask->triggerParam = triggerParam; char buf[128] = {0}; @@ -71,21 +71,21 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; - if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; + if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1; if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1; @@ -94,14 +94,14 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; - int32_t epSz = taosArrayGetSize(pTask->childEpInfo); + int32_t epSz = taosArrayGetSize(pTask->pUpstreamEpInfoList); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; } - if (pTask->taskLevel != TASK_LEVEL__SINK) { + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } @@ -131,21 +131,21 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; - if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; + if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1; @@ -156,7 +156,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; - pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*)); + + pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES); for (int32_t i = 0; i < epSz; i++) { SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); if (pInfo == NULL) return -1; @@ -164,10 +165,10 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { taosMemoryFreeClear(pInfo); return -1; } - taosArrayPush(pTask->childEpInfo, &pInfo); + taosArrayPush(pTask->pUpstreamEpInfoList, &pInfo); } - if (pTask->taskLevel != TASK_LEVEL__SINK) { + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; } @@ -217,7 +218,7 @@ void tFreeStreamTask(SStreamTask* pTask) { walCloseReader(pTask->exec.pWalReader); } - taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); + taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree); if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema);