提交 db46d31c 编写于 作者: H Haojun Liao

fix(stream): refactor the halt function.

上级 9353f7c8
......@@ -45,7 +45,7 @@ enum {
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused, todo remove it?
TASK_STATUS__PAUSE, // pause
};
......@@ -272,6 +272,7 @@ typedef struct SStreamStatus {
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus;
typedef struct SHistDataRange {
......@@ -557,7 +558,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
......@@ -593,6 +593,9 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
......
......@@ -25,6 +25,7 @@
#define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode;
static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
......@@ -267,10 +268,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
return terrno;
}
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pTask, pSinkTask);
}
return TSDB_CODE_SUCCESS;
}
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -295,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
}
int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -418,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return -1;
}
return setEpToDownstreamTask(pTask, pDownstreamTask);
return setTaskUpstreamEpInfo(pTask, pDownstreamTask);
}
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
......@@ -586,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return TSDB_CODE_SUCCESS;
}
static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) {
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask);
}
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
......@@ -637,6 +651,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
return code;
}
setSinkTaskUpstreamInfo(pStream->tasks, pAggTask);
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
// source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
} else if (numOfPlanLevel == 1) {
......
......@@ -914,6 +914,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
}
// reset the task status from unfinished transaction
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__NORMAL;
}
streamSetupScheduleTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
......@@ -1031,9 +1037,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
// 2.save task, use the newest commit version as the initial start version of stream task.
int32_t taskId = 0;
taosWLockLatch(&pStreamMeta->lock);
code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);
taskId = pTask->id.taskId;
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
......@@ -1046,7 +1054,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
// 3. It's an fill history task, do nothing. wait for the main task to start it
streamTaskCheckDownstreamTasks(pTask);
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) {
streamTaskCheckDownstreamTasks(pTask);
}
streamMetaReleaseTask(pStreamMeta, p);
return 0;
}
......@@ -1073,7 +1086,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING);
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
ASSERT(0);
tqDebug("s-task:%s failed to launch scan history data in current time window, unexpected sched status:%d", id,
schedStatus);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
......@@ -1215,9 +1231,11 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already", req.taskId);
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.taskId);
return -1;
}
......@@ -1406,17 +1424,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
return 0;
}
// todo rule out the status when pause not suitable.
static int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
if (!streamTaskShouldPause(&pTask->status)) {
tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr);
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
return 0;
}
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
......@@ -1425,30 +1432,29 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
int32_t code = tqProcessTaskPauseImpl(pMeta, pTask);
if (code != 0) {
streamMetaReleaseTask(pMeta, pTask);
return code;
}
streamTaskPause(pTask);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId,
pTask->historyTaskId.taskId);
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s fill-history task handle pause along with related stream task", pHistoryTask->id.idStr);
code = tqProcessTaskPauseImpl(pMeta, pHistoryTask);
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
......@@ -1456,7 +1462,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
streamMetaReleaseTask(pMeta, pHistoryTask);
}
return code;
return TSDB_CODE_SUCCESS;
}
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
......
......@@ -365,6 +365,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr);
}
// todo fix race condition
streamTaskDisablePause(pTask);
streamTaskDisablePause(pStreamTask);
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
......@@ -420,6 +424,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
}
taosWUnLockLatch(&pMeta->lock);
// pause allowed
streamTaskEnablePause(pStreamTask);
streamTaskEnablePause(pTask);
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_SUCCESS;
......@@ -568,22 +576,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
bool streamTaskIsIdle(const SStreamTask* pTask) {
int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue);
if (numOfItems > 0) {
return false;
}
numOfItems = taosQallItemSize(pTask->inputQueue->qall);
if (numOfItems > 0) {
return false;
}
// blocked by downstream task
if (pTask->outputInfo.status == TASK_OUTPUT_STATUS__BLOCKED) {
return false;
}
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE/* && pTask->status.taskStatus != TASK_STATUS__HALT*/);
}
int32_t streamTryExec(SStreamTask* pTask) {
......
......@@ -72,7 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
return doLaunchScanHistoryTask(pTask);
int32_t code = doLaunchScanHistoryTask(pTask);
streamTaskEnablePause(pTask);
return code;
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
......@@ -86,6 +88,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
}
streamTaskEnablePause(pTask);
return 0;
}
......@@ -198,6 +201,11 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
}
// enable pause when init completed.
if (pTask->historyTaskId.taskId == 0 && pTask->info.fillHistory == 0) {
streamTaskEnablePause(pTask);
}
// when current stream task is ready, check the related fill history task.
launchFillHistoryTask(pTask);
}
......@@ -415,8 +423,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
// agg
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
pTask->numOfWaitingUpstream);
qDebug("s-task:%s agg task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr,
pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus));
return 0;
}
......@@ -745,7 +753,6 @@ void launchFillHistoryTask(SStreamTask* pTask) {
streamLaunchFillHistoryTask(pTask);
}
// todo handle race condition, this task may be destroyed
void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
if (pTask->info.fillHistory) {
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
......@@ -757,3 +764,37 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
// check downstream tasks for itself
streamTaskDoCheckDownstreamTasks(pTask);
}
void streamTaskPause(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int64_t st = taosGetTimestampMs();
while(!pTask->status.pauseAllowed) {
qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId);
taosMsleep(100);
}
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.keepTaskStatus), el);
}
// todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) {
// pre-condition check
while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
taosMsleep(10);
qDebug("s-task:%s already in pause, wait for pause being cancelled, and then set pause disabled", pTask->id.idStr);
}
qDebug("s-task:%s disable task pause", pTask->id.idStr);
pTask->status.pauseAllowed = 0;
}
void streamTaskEnablePause(SStreamTask* pTask) {
qDebug("s-task:%s enable task pause", pTask->id.idStr);
pTask->status.pauseAllowed = 1;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册