提交 c5b279f0 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

......@@ -409,6 +409,7 @@ typedef struct SStreamMeta {
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
int32_t pauseTaskNum;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
......@@ -673,8 +674,8 @@ int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskHalt(SStreamTask* pTask);
void streamTaskResumeFromHalt(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask);
......
......@@ -1635,7 +1635,7 @@ int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
if (mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
......@@ -1775,7 +1775,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
......
......@@ -223,7 +223,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqStartStreamTasks(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -1199,7 +1199,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamSetStatusNormal(pTask);
}
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, false);
}
streamMetaReleaseTask(pMeta, pTask);
......@@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, false);
return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this
......@@ -1448,7 +1448,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
streamTaskPause(pTask);
streamTaskPause(pTask, pMeta);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
......@@ -1464,7 +1464,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask);
streamTaskPause(pHistoryTask, pMeta);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
......@@ -1479,9 +1479,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
}
// todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal
streamTaskResume(pTask);
streamTaskResume(pTask, pTq->pStreamMeta);
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
// no lock needs to secure the access of the version
......@@ -1500,7 +1505,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, false);
} else {
streamSchedExec(pTask);
}
......
......@@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
tqStartStreamTasks(pTq);
tqStartStreamTasks(pTq, true);
}
return 0;
......
......@@ -121,7 +121,7 @@ int32_t tqCheckStreamStatus(STQ* pTq) {
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
......@@ -147,6 +147,13 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return 0;
}
int32_t numOfPauseTasks = pTq->pStreamMeta->pauseTaskNum;
if (ckPause && numOfTasks == numOfPauseTasks) {
tqDebug("ignore all submit, all streams had been paused");
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -403,7 +403,11 @@ static void doRetryDispatchData(void* param, void* tmrId) {
if (!streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
if (streamTaskShouldPause(&pTask->status)) {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
} else {
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
......
......@@ -203,6 +203,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
pMeta->pauseTaskNum = 0;
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
return pMeta;
......@@ -486,6 +488,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask) {
pTask = *ppTask;
if (streamTaskShouldPause(&pTask->status)) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
}
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
......@@ -685,8 +691,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
return -1;
}
if (streamTaskShouldPause(&pTask->status)) {
atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
}
ASSERT(pTask->status.downstreamReady == 0);
}
qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum);
tdbFree(pKey);
tdbFree(pVal);
......
......@@ -811,9 +811,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
}
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
void streamTaskPause(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
int64_t st = taosGetTimestampMs();
int8_t status = pTask->status.taskStatus;
......@@ -828,6 +826,12 @@ void streamTaskPause(SStreamTask* pTask) {
return;
}
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
return;
}
while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
......@@ -857,6 +861,8 @@ void streamTaskPause(SStreamTask* pTask) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
taosWUnLockLatch(&pMeta->lock);
// in case of fill-history task, stop the tsdb file scan operation.
......@@ -870,12 +876,16 @@ void streamTaskPause(SStreamTask* pTask) {
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
}
void streamTaskResume(SStreamTask* pTask) {
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s resume from pause", pTask->id.idStr);
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s sink task.resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else {
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册