提交 255af841 编写于 作者: 5 54liuyao

resume option

上级 10582fb5
...@@ -2886,6 +2886,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq ...@@ -2886,6 +2886,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int32_t taskId; int32_t taskId;
int8_t igUntreated;
} SVResumeStreamTaskReq; } SVResumeStreamTaskReq;
typedef struct { typedef struct {
......
...@@ -1402,7 +1402,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { ...@@ -1402,7 +1402,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
} }
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1410,10 +1410,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { ...@@ -1410,10 +1410,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) {
} }
pReq->head.vgId = htonl(pTask->nodeId); pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->igUntreated = igUntreated;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SVPauseStreamTaskReq); action.contLen = sizeof(SVResumeStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_RESUME; action.msgType = TDMT_STREAM_TASK_RESUME;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
...@@ -1422,14 +1423,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) { ...@@ -1422,14 +1423,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0; return 0;
} }
int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks); int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i); SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndResumeStreamTask(pTrans, pTask) < 0) { if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1; return -1;
} }
} }
...@@ -1441,8 +1442,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { ...@@ -1441,8 +1442,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
SMPauseStreamReq pauseReq = {0}; SMResumeStreamReq pauseReq = {0};
if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -1481,7 +1482,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { ...@@ -1481,7 +1482,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
// resume all tasks // resume all tasks
if (mndResumeAllStreamTasks(pTrans, pStream) < 0) { if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册