提交 dcb91082 编写于 作者: H Haojun Liao

fix(stream): fix error.

上级 cb75e5a8
...@@ -223,11 +223,9 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -223,11 +223,9 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = { .info = pMsg->info, .code = 0};
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp); streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
tDeleteStreamRetrieveReq(&req); tDeleteStreamRetrieveReq(&req);
......
...@@ -161,7 +161,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, ...@@ -161,7 +161,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue // enqueue
if (pData != NULL) { if (pData != NULL) {
qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId,
pReq->srcTaskId, pReq->reqId); pReq->srcTaskId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->type = STREAM_INPUT__DATA_RETRIEVE;
...@@ -271,7 +271,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { ...@@ -271,7 +271,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
} }
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); qDebug("s-task:%s receive retrieve req from node %d taskId:0x%x", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp); streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
......
...@@ -138,7 +138,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -138,7 +138,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
req.dstNodeId = pEpInfo->nodeId; req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId; req.dstTaskId = pEpInfo->taskId;
int32_t code;
int32_t len; int32_t len;
tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code); tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
if (code < 0) { if (code < 0) {
...@@ -158,23 +157,18 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -158,23 +157,18 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
tEncodeStreamRetrieveReq(&encoder, &req); tEncodeStreamRetrieveReq(&encoder, &req);
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = { .code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len };
.code = 0,
.msgType = TDMT_STREAM_RETRIEVE,
.pCont = buf,
.contLen = sizeof(SMsgHead) + len,
};
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
ASSERT(0); ASSERT(0);
goto CLEAR; goto CLEAR;
} }
buf = NULL;
qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->id.idStr, buf = NULL;
qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr,
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
} }
code = 0; code = 0;
CLEAR: CLEAR:
taosMemoryFree(pRetrieve); taosMemoryFree(pRetrieve);
rpcFreeCont(buf); rpcFreeCont(buf);
...@@ -400,12 +394,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -400,12 +394,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if (req.data == NULL || req.dataLen == NULL) { if (req.data == NULL || req.dataLen == NULL) {
taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen); taosArrayDestroy(req.dataLen);
return code; return TSDB_CODE_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
if (streamAddBlockIntoDispatchMsg(pDataBlock, &req) < 0) { code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen); taosArrayDestroy(req.dataLen);
return code; return code;
......
...@@ -108,7 +108,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -108,7 +108,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
block.info.type = STREAM_PULL_OVER; block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
numOfBlocks += 1;
qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
pRetrieveBlock->reqId); pRetrieveBlock->reqId);
} }
...@@ -152,14 +152,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -152,14 +152,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
if (numOfBlocks > 0) { if (numOfBlocks > 0) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes)); ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else { } else {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
} }
return 0; return code;
} }
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册