提交 767b768e 编写于 作者: L Liu Jicong

fix(stream): fix shuffle vg id not initialized

上级 6f4f6c14
...@@ -151,33 +151,36 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* ...@@ -151,33 +151,36 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
ASSERT(pDb); ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pMnode->pSdb, pDb); ASSERT(0);
return -1;
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; }
int32_t sz = taosArrayGetSize(pVgs); sdbRelease(pMnode->pSdb, pDb);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv); SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
for (int32_t i = 0; i < sz; i++) { int32_t sz = taosArrayGetSize(pVgs);
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
for (int32_t j = 0; j < sinkLvSize; j++) { int32_t sinkLvSize = taosArrayGetSize(sinkLv);
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); for (int32_t i = 0; i < sz; i++) {
if (pLastLevelTask->nodeId == pVgInfo->vgId) { SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
pVgInfo->taskId = pLastLevelTask->taskId; for (int32_t j = 0; j < sinkLvSize; j++) {
break; SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
} if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
ASSERT(pVgInfo->taskId != 0);
break;
} }
} }
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
} }
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
} }
return 0; return 0;
} }
...@@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
// dispatch // dispatch
mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask); if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) {
qDestroyQueryPlan(pPlan);
return -1;
}
// exec // exec
pFinalTask->execType = TASK_EXEC__PIPE; pFinalTask->execType = TASK_EXEC__PIPE;
......
...@@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM ...@@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.upstreamNodeId = pTask->nodeId, .upstreamNodeId = pTask->nodeId,
.blockNum = blockNum, .blockNum = blockNum,
}; };
qInfo("dispatch from task %d (child id %d)", pTask->taskId, pTask->childId);
req.data = taosArrayInit(blockNum, sizeof(void*)); req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
...@@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM ...@@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
break; break;
} }
} }
ASSERT(vgId != 0);
} }
ASSERT(vgId != 0);
req.taskId = downstreamTaskId; req.taskId = downstreamTaskId;
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId,
downstreamTaskId, vgId);
// serialize // serialize
int32_t tlen; int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code); tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册