diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a5b2416ea9fb4cbd009cc4eda131c8774f12df00..1f9f686f753b30150a0380fe1391623313da32a6 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,7 +25,7 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream); +static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); @@ -270,13 +270,13 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setEpToDownstreamTask(pTask, pSinkTask); + setTaskUpstreamEpInfo(pTask, pSinkTask); } return TSDB_CODE_SUCCESS; } -static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { +static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); if (pEpInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -301,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } -int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { +int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -424,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui return -1; } - return setEpToDownstreamTask(pTask, pDownstreamTask); + return setTaskUpstreamEpInfo(pTask, pDownstreamTask); } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, @@ -592,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr return TSDB_CODE_SUCCESS; } +static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) { + SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask); + } +} + static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -643,11 +651,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return code; } - SArray* pSinkTaskList = taosArrayGetP(pStream->tasks, 0); - for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { - SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setEpToDownstreamTask(pAggTask, pSinkTask); - } + setSinkTaskUpstreamInfo(pStream->tasks, pAggTask); + setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); // source level return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 19825b438b99f6d2b70c1f0800a7e325f3966f16..89beaadf72d485b1905cec330730395fdf3ab00d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1673,7 +1673,8 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { return code; } - tqDebug("vgId:%d s-task:%s received the checkpoint ready msg, handle it", vgId, pTask->id.idStr); + tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); streamProcessCheckpointReadyMsg(pTask); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b436362e1c748cf3f0a171ba584003fcc7bc6317..04f73707c2118adce92af080b967e0eca2960e7a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2648,6 +2648,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera for (int32_t i = 0; i < mapSize; i++) { SWinKey key = {0}; SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); + pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey)); buf = decodeSWinKey(buf, &key); buf = decodeSRowBuffPos(buf, pPos); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index dbec866705c02496cace9367a80016063843d8ee..cad0d589257fe3bae747fc3930199678c9e53447 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -113,6 +113,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i // TODO } continue; + } else if (output->info.type == STREAM_CHECKPOINT) { + continue; // checkpoint block not dispatch to downstream tasks } SSDataBlock block = {0};