diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3542788b4b429f7a3c545e1325aae35b49bf6580..cee2def85fc019e5ab00f287dd3efd8e510250cb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -122,6 +122,7 @@ typedef struct { int8_t type; int32_t srcVgId; + int32_t srcTaskId; int32_t childId; int64_t sourceVer; int64_t reqId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index caf20a499cf4ea621c56302573d709236e46d3f5..0eaeafd0b3309c8c6efec157c59110356bc0ef7e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -142,40 +142,6 @@ int32_t streamSchedExec(SStreamTask* pTask) { return 0; } -int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - int8_t status = 0; - - SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->srcVgId); - if (pBlock == NULL) { - streamTaskInputFail(pTask); - status = TASK_INPUT_STATUS__FAILED; - qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, - pTask->id.idStr); - } else { - int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); - // input queue is full, upstream is blocked now - status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED; - } - - // rsp by input status - void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - pDispatchRsp->inputStatus = status; - pDispatchRsp->streamId = htobe64(pReq->streamId); - pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); - pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); - pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); - pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); - - pRsp->pCont = buf; - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - tmsgSendRsp(pRsp); - - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; -} - int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status = TASK_INPUT_STATUS__NORMAL; @@ -240,7 +206,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { int8_t status = 0; - SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); + SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index bb4b842787a640435f561d6e75074869da8885af..fcc0195bf4285b64f52e20ad6e747e5aec893158 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,6 +15,45 @@ #include "streamInt.h" +SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); + if (pData == NULL) { + return NULL; + } + + pData->type = blockType; + pData->srcVgId = srcVg; + pData->srcTaskId = pReq->upstreamTaskId; + + int32_t blockNum = pReq->blockNum; + SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); + if (pArray == NULL) { + taosFreeQitem(pData); + return NULL; + } + + ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); + + for (int32_t i = 0; i < blockNum; i++) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); + SSDataBlock* pDataBlock = taosArrayGet(pArray, i); + blockDecode(pDataBlock, pRetrieve->data); + + // TODO: refactor + pDataBlock->info.window.skey = be64toh(pRetrieve->skey); + pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); + pDataBlock->info.version = be64toh(pRetrieve->version); + pDataBlock->info.watermark = be64toh(pRetrieve->watermark); + memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN); + + pDataBlock->info.type = pRetrieve->streamBlockType; + pDataBlock->info.childId = pReq->upstreamChildId; + } + + pData->blocks = pArray; + return pData; +} + SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); if (pData == NULL) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a162f0e7703f4ed9edecebe1f946b1232482556e..fab7856cf6d8fbde96f99bd32656017cea873de6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -89,7 +89,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; @@ -115,7 +115,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId) { pReq->streamId = pTask->id.streamId; - pReq->dataSrcVgId = vgId; + pReq->srcVgId = vgId; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId;