提交 41c19394 编写于 作者: L liuyao

fill history pause&resume

上级 0eeaab9c
......@@ -221,11 +221,15 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
void resetTaskInfo(qTaskInfo_t tinfo);
......
......@@ -580,6 +580,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
......@@ -588,7 +591,8 @@ int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
// source level
int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
// int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
......
......@@ -1337,10 +1337,10 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t size = taosArrayGetSize(pStream->tasks);
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) {
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
SArray *pTasks = taosArrayGetP(tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
......@@ -1352,6 +1352,16 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t code = mndPauseAllStreamTaskImpl(pTrans, pStream->tasks);
if (code != 0) {
return code;
}
// pStream->pHTasksList is null
// code = mndPauseAllStreamTaskImpl(pTrans, pStream->pHTasksList);
return code;
}
static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) {
SStreamObj streamObj = {0};
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
......@@ -1473,6 +1483,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
}
}
}
// pStream->pHTasksList is null
return 0;
}
......
......@@ -1036,9 +1036,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
......@@ -1047,13 +1050,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
if (pTask->info.fillHistory) {
SVersionRange* pRange = NULL;
SStreamTask* pStreamTask = NULL;
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
// todo handle error
}
// streamTaskReleaseState(pTask);
// streamTaskReloadState(pStreamTask);
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
......@@ -1078,13 +1082,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
SVersionRange* pRange = &pTask->dataRange.range;
pRange = &pTask->dataRange.range;
pRange->minVer = pRange->maxVer + 1;
pRange->maxVer = ver;
if (pRange->minVer == pRange->maxVer) {
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pTask->id.idStr);
} else {
streamTaskRecoverSetAllStepFinished(pTask);
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest",
pTask->id.idStr);
}
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
" do secondary scan-history-data after halt the related stream task:%s",
pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
......@@ -1092,19 +1101,23 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs();
streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);
streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
}
if(!streamTaskRecoverScanStep2Finished(pTask)) {
streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
streamTaskRecoverSetAllStepFinished(pTask);
}
el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el);
}
if (!pTask->status.transferState) {
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
pTask->status.transferState = true;
......@@ -1112,6 +1125,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
}
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task.
......@@ -1119,7 +1133,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);
// transfer the ownership of executor state
// todo(liuyao)
// streamTaskReleaseState(pTask);
// streamTaskReloadState(pStreamTask);
streamMetaSaveTask(pMeta, pTask);
streamMetaSaveTask(pMeta, pStreamTask);
......@@ -1165,6 +1182,11 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
tqError("failed to find task:0x%x", req.taskId);
return -1;
}
// transfer the ownership of executor state
// todo(liuyao)
// streamTaskReleaseState(pTask);
// SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
// streamTaskReloadState(pStreamTask);
ASSERT(pTask->streamTaskId.taskId != 0);
pTask->status.transferState = true; // persistent data?
......@@ -1398,28 +1420,40 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
return 0;
}
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
if (pTask) {
tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
if (!streamTaskShouldPause(&pTask->status)) {
tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr);
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
}
streamMetaReleaseTask(pStreamMeta, pTask);
} else {
return -1;
}
return 0;
}
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask);
if (code != 0) {
return code;
}
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask);
return code;
}
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
int32_t vgId = pTq->pStreamMeta->vgId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) {
if (streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version
if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
......@@ -1431,20 +1465,33 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
}
if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamStartRecoverTask(pTask, pReq->igUntreated);
streamStartRecoverTask(pTask, igUntreated);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
tqStartStreamTasks(pTq);
} else {
streamSchedExec(pTask);
}
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} else {
tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
return -1;
}
return 0;
}
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
if (code != 0) {
return code;
}
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
return code;
}
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
......
......@@ -62,6 +62,8 @@ typedef struct {
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep;
bool recoverStep1Finished;
bool recoverStep2Finished;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
SVersionRange fillHistoryVer;
......
......@@ -870,7 +870,7 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
......@@ -879,8 +879,29 @@ int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *p
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
pStreamInfo->recoverStep1Finished = false;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64
qDebug("%s step 1. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
}
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
......@@ -1022,6 +1043,23 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
return pTaskInfo->streamInfo.recoverScanFinished;
}
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep1Finished;
}
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep2Finished;
}
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.recoverStep1Finished = true;
pTaskInfo->streamInfo.recoverStep2Finished = true;
return 0;
}
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
......
......@@ -179,7 +179,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0;
while (1) {
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}
......@@ -195,6 +195,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
finished = true;
} else {
qSetStreamOpOpen(exec);
if (streamTaskShouldPause(&pTask->status)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}
}
break;
}
......
......@@ -55,7 +55,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
pRange->minVer, pRange->maxVer);
streamSetParamForScanHistoryData(pTask);
streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartRecoverTask(pTask, 0);
return code;
......@@ -261,8 +261,12 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
// source
int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow);
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
}
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
}
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
......@@ -512,6 +516,21 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
return 0;
}
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRecoverScanStep1Finished(exec);
}
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRecoverScanStep2Finished(exec);
}
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRecoverSetAllStepFinished(exec);
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册