diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4000e728359b7f88f339519474b2033b14502bf6..635fdcf45975d075456e3ffcaf566fe7aec978b0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -77,6 +77,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->chkInfo.version = ver; pTask->pMeta = pSnode->pMeta; + streamTaskOpenAllUpstreamInput(pTask); + pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { return -1; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index fcc0195bf4285b64f52e20ad6e747e5aec893158..fc1b788b773941da01535bf098474ba736a38d1f 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,45 +15,6 @@ #include "streamInt.h" -SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); - if (pData == NULL) { - return NULL; - } - - pData->type = blockType; - pData->srcVgId = srcVg; - pData->srcTaskId = pReq->upstreamTaskId; - - int32_t blockNum = pReq->blockNum; - SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); - if (pArray == NULL) { - taosFreeQitem(pData); - return NULL; - } - - ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); - - for (int32_t i = 0; i < blockNum; i++) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); - SSDataBlock* pDataBlock = taosArrayGet(pArray, i); - blockDecode(pDataBlock, pRetrieve->data); - - // TODO: refactor - pDataBlock->info.window.skey = be64toh(pRetrieve->skey); - pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); - pDataBlock->info.version = be64toh(pRetrieve->version); - pDataBlock->info.watermark = be64toh(pRetrieve->watermark); - memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN); - - pDataBlock->info.type = pRetrieve->streamBlockType; - pDataBlock->info.childId = pReq->upstreamChildId; - } - - pData->blocks = pArray; - return pData; -} - SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); if (pData == NULL) { @@ -243,7 +204,7 @@ void streamFreeQitem(SStreamQueueItem* data) { if (type == STREAM_INPUT__GET_RES) { blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) { + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d479dd44df7cffc99369cc78da13e5dc9672ea15..94e005b7904944e6fef09bfa8df8e4086814e278 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -774,7 +774,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fa3f149a43a5fb01a99d5b2715a2638d96555bde..3b954793deefc6f0e1f868d961c4ef562e984d0f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -504,6 +504,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) { int32_t remain = streamAlignTransferState(pTask); if (remain > 0) { + streamFreeQitem((SStreamQueueItem*)pBlock); qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain); return 0; } @@ -532,6 +533,8 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } } else { // non-dispatch task, do task state transfer directly qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + + streamFreeQitem((SStreamQueueItem*)pBlock); ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask);