提交 16ee72ab 编写于 作者: H Haojun Liao

fix(stream): free msg after send checkpoint rsp.

上级 f6515e2a
...@@ -752,6 +752,7 @@ SArray *vmGetMsgHandles() { ...@@ -752,6 +752,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -487,6 +487,8 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) { ...@@ -487,6 +487,8 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) {
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
} }
taosArrayClear(pTask->pRpcMsgList);
int8_t prev = pTask->status.taskStatus; int8_t prev = pTask->status.taskStatus;
pTask->status.taskStatus = TASK_STATUS__NORMAL; pTask->status.taskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream, set status:%s, prev:%s", pTask->id.idStr, qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream, set status:%s, prev:%s", pTask->id.idStr,
...@@ -502,6 +504,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { ...@@ -502,6 +504,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosArrayClear(pTask->pRpcMsgList);
int8_t prev = pTask->status.taskStatus; int8_t prev = pTask->status.taskStatus;
pTask->status.taskStatus = TASK_STATUS__NORMAL; pTask->status.taskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode, set status:%s, prev:%s", pTask->id.idStr, qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode, set status:%s, prev:%s", pTask->id.idStr,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册