From 075b5e94817d4c3e8d04b0a0e832acd0219c8ebe Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Aug 2023 18:41:54 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/stream.c | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 9548c3c327..d73bb1562e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -253,6 +253,25 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp return status; } +static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { + *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (*pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); + + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + return TSDB_CODE_SUCCESS; +} + int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); -- GitLab