提交 3d231831 编写于 作者: H Haojun Liao

Merge branch '3.0' into fix/liaohj

...@@ -45,7 +45,6 @@ enum { ...@@ -45,7 +45,6 @@ enum {
TASK_STATUS__FAIL, TASK_STATUS__FAIL,
TASK_STATUS__STOP, TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
}; };
......
...@@ -1145,7 +1145,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1145,7 +1145,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 3. It's an fill history task, do nothing. wait for the main task to start it // 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) { // reset the downstreamReady flag. if (p != NULL) { // reset the downstreamReady flag.
p->status.downstreamReady = 0;
streamTaskCheckDownstreamTasks(p); streamTaskCheckDownstreamTasks(p);
} }
...@@ -1154,12 +1153,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1154,12 +1153,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
char* msg = pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
...@@ -1167,12 +1164,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1167,12 +1164,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
// do recovery step 1 // do recovery step1
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
int64_t st = taosGetTimestampMs(); if (pTask->tsInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
pTask->tsInfo.step1Start = taosGetTimestampMs();
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
}
// we have to continue retrying to successfully execute the scan history task. // we have to continue retrying to successfully execute the scan history task.
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
...@@ -1185,31 +1190,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1185,31 +1190,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
ASSERT(pTask->status.pauseAllowed == false);
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask); ASSERT(pTask->status.pauseAllowed == true);
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
streamSourceScanHistoryData(pTask);
} }
// disable the pause when handling the step2 scan of tsdb data. streamSourceScanHistoryData(pTask);
// the whole next procedure cann't be stopped. if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode. double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
if (pTask->info.fillHistory == 1) { tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
streamTaskDisablePause(pTask); TASK_SCHED_STATUS__INACTIVE);
}
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
double el = (taosGetTimestampMs() - st) / 1000.0; // the following procedure should be executed, no matter status is stop/pause or not
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
...@@ -1217,77 +1212,71 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1217,77 +1212,71 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pStreamTask = NULL; SStreamTask* pStreamTask = NULL;
bool done = false; bool done = false;
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. get the related stream task
// 1. stop the related stream task, get the current scan wal version of stream task, ver. pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) {
if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr); pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING; tqDebug("s-task:%s fill-history task set status to be dropping", id);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask); streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused. // 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// wait for the stream task get ready for scan history data // stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug( tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100); taosMsleep(100);
} }
// now we can stop the stream task execution // now we can stop the stream task execution
streamTaskHalt(pStreamTask); streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// if it's an source task, extract the last version in wal. tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pRange = &pTask->dataRange.range; pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); // if it's an source task, extract the last version in wal.
} pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
if (done) { if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
if (!streamTaskRecoverScanStep1Finished(pTask)) { STimeWindow* pWindow = &pTask->dataRange.window;
STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 ", do secondary scan-history from WAL after halt the related stream task:%s",
", do secondary scan-history from WAL after halt the related stream task:%s", id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
pTask->tsInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
}
if (!streamTaskRecoverScanStep2Finished(pTask)) { pTask->tsInfo.step2Start = taosGetTimestampMs();
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
int64_t dstVer = pTask->dataRange.range.minVer - 1; int64_t dstVer = pTask->dataRange.range.minVer - 1;
pTask->chkInfo.currentVer = dstVer; pTask->chkInfo.currentVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer, tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
TASK_SCHED_STATUS__INACTIVE); pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
}
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// set the fill-history task to be normal
if (pTask->info.fillHistory == 1) {
streamSetStatusNormal(pTask);
}
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task. // 5. resume the related stream task.
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
...@@ -1304,7 +1293,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1304,7 +1293,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug( tqDebug(
"s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64, "window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey); id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor); qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
...@@ -1500,7 +1489,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1500,7 +1489,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) { if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
...@@ -1637,7 +1626,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, ...@@ -1637,7 +1626,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} }
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) { if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartRecoverTask(pTask, igUntreated); streamStartRecoverTask(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
......
...@@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { ...@@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, set the transfer state flag", ", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, ver, pTask->dataRange.range.maxVer); pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
pTask->status.transferState = true; pTask->status.transferState = true;
...@@ -256,14 +256,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -256,14 +256,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { if (status != TASK_STATUS__NORMAL/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); // ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done // the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer); pTask->dataRange.range.maxVer);
......
...@@ -2121,8 +2121,7 @@ FETCH_NEXT_BLOCK: ...@@ -2121,8 +2121,7 @@ FETCH_NEXT_BLOCK:
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
SSDataBlock* pBlock = pInfo->pRes; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SDataBlockInfo* pBlockInfo = &pBlock->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
NEXT_SUBMIT_BLK: NEXT_SUBMIT_BLK:
...@@ -2146,35 +2145,36 @@ FETCH_NEXT_BLOCK: ...@@ -2146,35 +2145,36 @@ FETCH_NEXT_BLOCK:
} }
} }
blockDataCleanup(pBlock); blockDataCleanup(pInfo->pRes);
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id);
id);
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
qDebug("retrieve data failed, try next block in submit block, %s", id); qDebug("retrieve data failed, try next block in submit block, %s", id);
continue; continue;
} }
setBlockIntoRes(pInfo, pRes, false); // filter the block extracted from WAL files, according to the time window
// apply additional time window filter
doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pRes->info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, pRes, false);
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id); qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
// apply additional time window filter doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
...@@ -2196,7 +2196,7 @@ FETCH_NEXT_BLOCK: ...@@ -2196,7 +2196,7 @@ FETCH_NEXT_BLOCK:
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id); qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pBlock; return pInfo->pRes;
} }
if (pInfo->pUpdateDataRes->info.rows > 0) { if (pInfo->pUpdateDataRes->info.rows > 0) {
......
...@@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
bool finished = false; bool finished = false;
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return 0;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -404,6 +410,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -404,6 +410,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
...@@ -414,6 +422,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -414,6 +422,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// save to disk // save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
...@@ -615,7 +624,7 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -615,7 +624,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here // todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) { if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed // fill-history WAL scan has completed
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
streamTaskRecoverSetAllStepFinished(pTask); streamTaskRecoverSetAllStepFinished(pTask);
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
} else { } else {
......
...@@ -85,6 +85,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { ...@@ -85,6 +85,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask); streamSetParamForScanHistory(pTask);
} }
streamTaskEnablePause(pTask);
streamTaskScanHistoryPrepare(pTask); streamTaskScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
...@@ -839,7 +840,7 @@ void streamTaskPause(SStreamTask* pTask) { ...@@ -839,7 +840,7 @@ void streamTaskPause(SStreamTask* pTask) {
return; return;
} }
while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
status = pTask->status.taskStatus; status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) { if (status == TASK_STATUS__DROPPING) {
qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
...@@ -856,8 +857,19 @@ void streamTaskPause(SStreamTask* pTask) { ...@@ -856,8 +857,19 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep(100); taosMsleep(100);
} }
// todo: use the lock of the task.
taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
taosWUnLockLatch(&pMeta->lock);
qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
taosWUnLockLatch(&pMeta->lock);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册