提交 161fd690 编写于 作者: H Haojun Liao

fix(stream): fix the acquire task.

上级 f633c7fe
...@@ -1634,7 +1634,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1634,7 +1634,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tDeleteStreamDispatchReq(&req); // tDeleteStreamDispatchReq(&req);
return -1; return -1;
} }
...@@ -1741,7 +1741,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1741,7 +1741,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
// todo handle the case when the task not in ready state, and the checkpoint msg is arrived. // todo handle the case when the task not in ready state, and the checkpoint msg is arrived.
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
req.taskId); req.taskId);
...@@ -1798,7 +1798,7 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1798,7 +1798,7 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
return code; return code;
...@@ -1831,7 +1831,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1831,7 +1831,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId); req.taskId);
...@@ -1844,7 +1844,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1844,7 +1844,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pHistoryTask = NULL; SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError( tqError(
"vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped " "vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped "
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册