diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cee2def85fc019e5ab00f287dd3efd8e510250cb..508392ff77ed82213799cb9b5ca297824dd4a72d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -574,8 +574,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); -int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, - int64_t dstTaskId); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupScheduleTrigger(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index fab7856cf6d8fbde96f99bd32656017cea873de6..b6a03153d37f3ce36a4c22790a37961afe2c6f1a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -25,6 +25,9 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; +static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, + int32_t numOfBlocks, int64_t dstTaskId, int32_t type); + static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->msgType = msgType; pMsg->pCont = pCont; @@ -112,8 +115,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { return 0; } -int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, - int64_t dstTaskId) { +int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, + int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; pReq->srcVgId = vgId; pReq->upstreamTaskId = pTask->id.taskId; @@ -121,6 +124,7 @@ int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTas pReq->upstreamNodeId = pTask->info.nodeId; pReq->blockNum = numOfBlocks; pReq->taskId = dstTaskId; + pReq->type = type; pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES); pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); @@ -446,7 +450,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { SStreamDispatchReq req = {0}; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; - code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId); + code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, ); if (code != TSDB_CODE_SUCCESS) { return code; }