提交 930b267a 编写于 作者: H Haojun Liao

fix(stream): set the correct start offset for stream task.

上级 4ed26bbc
...@@ -230,23 +230,21 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* ...@@ -230,23 +230,21 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq*
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) { if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) {
goto FAIL; rpcFreeCont(buf);
return code;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead); msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf; msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK; msg.msgType = TDMT_STREAM_TASK_CHECK;
qDebug("dispatch from s-task:%s to downstream s-task:%"PRIx64":%d node %d: check msg", pTask->id.idStr, qDebug("dispatch from s-task:%s to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr,
pReq->streamId, pReq->downstreamTaskId, nodeId); pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
return 0; return 0;
FAIL:
if (buf) rpcFreeCont(buf);
return code;
} }
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
......
...@@ -189,7 +189,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -189,7 +189,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt);
streamDispatch(pTask); streamDispatch(pTask);
} }
if (finished) break;
if (finished) {
break;
}
} }
return 0; return 0;
} }
......
...@@ -296,8 +296,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -296,8 +296,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tDecodeStreamTask(&decoder, pTask); tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder); tDecoderClear(&decoder);
// todo set correct initial version. if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, 0) < 0) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
......
...@@ -102,8 +102,10 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp ...@@ -102,8 +102,10 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
.downstreamNodeId = pRsp->downstreamNodeId, .downstreamNodeId = pRsp->downstreamNodeId,
.childId = pRsp->childId, .childId = pRsp->childId,
}; };
qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->id.taskId, pTask->nodeId,
qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId); req.downstreamTaskId, req.downstreamNodeId);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
...@@ -116,6 +118,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp ...@@ -116,6 +118,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
} }
} }
} }
return 0; return 0;
} }
...@@ -158,9 +161,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -158,9 +161,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
} else { } else {
ASSERT(0); ASSERT(0);
} }
} else { } else { // not ready, it should wait for at least 100ms and then retry
streamRecheckOneDownstream(pTask, pRsp); streamRecheckOneDownstream(pTask, pRsp);
} }
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册