提交 4e0f7ffb 编写于 作者: H Haojun Liao

fix(stream): set the correct flag of pause/resume.

上级 9679cc45
......@@ -45,7 +45,7 @@ enum {
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused, todo remove it?
TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause
};
......
......@@ -925,7 +925,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
streamSetupScheduleTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms, disable pause",
" child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms, disable pause",
vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
pTask->info.fillHistory, pTask->triggerParam);
......@@ -1099,10 +1099,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(100);
}
ASSERT(pTask->status.pauseAllowed == false);
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
streamSourceScanHistoryData(pTask);
}
if (pTask->info.fillHistory == 1) {
streamTaskDisablePause(pTask);
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
......@@ -1114,8 +1124,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) {
streamTaskEnablePause(pTask);
SVersionRange* pRange = NULL;
SStreamTask* pStreamTask = NULL;
......@@ -1123,7 +1131,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill history task:%s",
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
......@@ -1136,6 +1144,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
// wait for the stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
......@@ -1146,7 +1155,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
// todo disable the pause
tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, id);
// if it's an source task, extract the last version in wal.
......@@ -1392,7 +1403,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__SCAN_HISTORY) {
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version);
streamProcessRunReq(pTask);
......@@ -1599,9 +1610,8 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) {
if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, false);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
......@@ -1609,18 +1619,22 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
taosFreeQitem(pMsg);
return 0;
} else {
tDeleteStreamDispatchReq(&req);
}
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
FAIL:
if (pMsg->info.handle == NULL) return -1;
if (pMsg->info.handle == NULL) {
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId);
return -1;
}
SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (pRspHead == NULL) {
SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info};
tqDebug("send dispatch error rsp, code: %x", code);
tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code));
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
......@@ -1636,9 +1650,10 @@ FAIL:
pRsp->downstreamTaskId = htonl(req.taskId);
pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
SRpcMsg rsp = {
.code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead};
tqDebug("send dispatch error rsp, code: %x", code);
int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
SRpcMsg rsp = { .code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code));
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
......
......@@ -240,7 +240,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
int32_t status = pTask->status.taskStatus;
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
// non-source or fill-history tasks don't need to response the WAL scan action.
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE || pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......
......@@ -262,14 +262,15 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast. todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code));
return code;
} else {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
return streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
}
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
......@@ -359,6 +360,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return -1;
}
int32_t msgLen = px->submit.msgLen;
int64_t ver = px->submit.ver;
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
if (code != TSDB_CODE_SUCCESS) {
streamDataSubmitDestroy(px);
......@@ -366,8 +370,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return code;
}
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0);
msgLen, ver, total, size + msgLen/1048576.0);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
......
......@@ -29,7 +29,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus) {
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE);
return (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT);
}
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
......@@ -365,10 +365,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr);
}
// todo fix race condition
streamTaskDisablePause(pTask);
streamTaskDisablePause(pStreamTask);
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
......@@ -426,7 +422,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// pause allowed
streamTaskEnablePause(pStreamTask);
streamTaskEnablePause(pTask);
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
......
......@@ -141,11 +141,6 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask);
// enable pause when init completed.
if (pTask->historyTaskId.taskId == 0) {
streamTaskEnablePause(pTask);
}
launchFillHistoryTask(pTask);
}
......@@ -473,7 +468,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
streamNotifyUpstreamContinue(pTask);
// sink node does not receive the pause msg from mnode
// sink node does not receive the pause msg from mnode, so does not need enable it
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamTaskEnablePause(pTask);
}
......@@ -497,6 +492,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
streamMetaSaveTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
// history data scan in the stream time window finished, now let's enable the pause
streamTaskEnablePause(pTask);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
......@@ -758,9 +754,15 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
if (pTask->historyTaskId.taskId == 0) {
SHistDataRange* pRange = &pTask->dataRange;
qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
" - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
if (pTask->info.fillHistory == 1) {
qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
"-%" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
} else {
qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
"-%" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
} else {
SHistDataRange* pRange = &pTask->dataRange;
......@@ -808,7 +810,25 @@ void streamTaskPause(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int64_t st = taosGetTimestampMs();
while(!pTask->status.pauseAllowed) {
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
const char* str = streamGetTaskStatusStr(status);
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
return;
}
while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId);
taosMsleep(100);
}
......@@ -826,8 +846,8 @@ void streamTaskDisablePause(SStreamTask* pTask) {
// pre-condition check
const char* id = pTask->id.idStr;
while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
taosMsleep(100);
qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, check in 100ms", id);
}
qDebug("s-task:%s disable task pause", id);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册