提交 4f534ce6 编写于 作者: H Haojun Liao

fix(stream): fix error in generating checkpoint.

上级 a5c19427
...@@ -621,7 +621,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); ...@@ -621,7 +621,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq); int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask);
......
...@@ -1409,18 +1409,17 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1409,18 +1409,17 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask == NULL) {
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tDeleteStreamRetrieveReq(&req);
return 0;
} else {
tDeleteStreamRetrieveReq(&req); tDeleteStreamRetrieveReq(&req);
return -1; return -1;
} }
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tDeleteStreamRetrieveReq(&req);
return 0;
} }
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
...@@ -1579,7 +1578,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1579,7 +1578,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) {
goto FAIL; goto FAIL;
} }
streamProcessCheckpointReq(pMeta, pTask, &req); streamProcessCheckpointReq(pTask, &req);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
...@@ -1593,7 +1592,6 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1593,7 +1592,6 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
// if this task is an source task, send source rsp to mnode // if this task is an source task, send source rsp to mnode
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0; int32_t code = 0;
......
...@@ -210,7 +210,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, ...@@ -210,7 +210,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
return code; return code;
} }
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) { int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) {
int32_t code; int32_t code;
int64_t checkpointId = pReq->checkpointId; int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId; int32_t childId = pReq->childId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册