提交 e89f530d 编写于 作者: H Haojun Liao

fix(stream): scan wal in step2

上级 a4e19013
...@@ -45,6 +45,7 @@ enum { ...@@ -45,6 +45,7 @@ enum {
TASK_STATUS__FAIL, TASK_STATUS__FAIL,
TASK_STATUS__STOP, TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
}; };
...@@ -302,6 +303,12 @@ typedef struct { ...@@ -302,6 +303,12 @@ typedef struct {
SStreamQueue* queue; SStreamQueue* queue;
} STaskOutputInfo; } STaskOutputInfo;
typedef struct {
int64_t init;
int64_t step1Start;
int64_t step2Start;
} STaskTimestamp;
struct SStreamTask { struct SStreamTask {
SStreamId id; SStreamId id;
SSTaskBasicInfo info; SSTaskBasicInfo info;
...@@ -316,7 +323,7 @@ struct SStreamTask { ...@@ -316,7 +323,7 @@ struct SStreamTask {
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId; int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo> SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
int64_t initTs; STaskTimestamp tsInfo;
// output // output
union { union {
STaskDispatcherFixedEp fixedEpDispatcher; STaskDispatcherFixedEp fixedEpDispatcher;
......
...@@ -72,7 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -72,7 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
return -1; return -1;
} }
pTask->initTs = taosGetTimestampMs(); pTask->tsInfo.init = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb; pTask->pMsgCb = &pSnode->msgCb;
......
...@@ -818,7 +818,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -818,7 +818,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return -1; return -1;
} }
pTask->initTs = taosGetTimestampMs(); pTask->tsInfo.init = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
...@@ -1115,7 +1115,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1115,7 +1115,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamTaskDisablePause(pTask); streamTaskDisablePause(pTask);
} }
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
...@@ -1123,7 +1123,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1123,7 +1123,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
SVersionRange* pRange = NULL; SVersionRange* pRange = NULL;
...@@ -1173,34 +1173,35 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1173,34 +1173,35 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
} }
if (!streamTaskRecoverScanStep2Finished(pTask)) { if (!streamTaskRecoverScanStep2Finished(pTask)) {
streamSourceScanHistoryData(pTask); pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
streamTaskRecoverSetAllStepFinished(pTask); int64_t dstVer = pTask->dataRange.range.minVer - 1;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s seek wal reader to ver:%"PRId64, id, dstVer);
} }
el = (taosGetTimestampMs() - st) / 1000.0; // int64_t el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); // tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el);
//
// 3. notify downstream tasks to transfer executor state after handle all history blocks. // // 3. notify downstream tasks to transfer executor state after handle all history blocks.
if (!pTask->status.transferState) { // if (!pTask->status.transferState) {
code = streamDispatchTransferStateMsg(pTask); // code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
// todo handle error // // todo handle error
} // }
//
pTask->status.transferState = true; // pTask->status.transferState = true;
} // }
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task. // 5. resume the related stream task.
...@@ -1409,8 +1410,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1409,8 +1410,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) { if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t status = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
......
...@@ -247,7 +247,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -247,7 +247,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
if (status != TASK_STATUS__NORMAL) { if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
...@@ -261,6 +261,17 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -261,6 +261,17 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false; *pScanIdle = false;
if (pTask->info.fillHistory == 1) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL);
// the maximum version of data in the WAL has reached already, the step2 is done
if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore",
pTask->id.idStr, pTask->chkInfo.currentVer);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
}
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int32_t code = doSetOffsetForWalReader(pTask, vgId); int32_t code = doSetOffsetForWalReader(pTask, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -283,9 +294,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -283,9 +294,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
noDataInWal = false; noDataInWal = false;
code = tAppendDataToInputQueue(pTask, pItem); code = tAppendDataToInputQueue(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, pTask->chkInfo.currentVer = ver;
pTask->chkInfo.currentVer); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
} else { } else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
pTask->chkInfo.currentVer); pTask->chkInfo.currentVer);
......
...@@ -324,9 +324,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { ...@@ -324,9 +324,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
return -1; return -1;
} }
/*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
/*streamDispatchStreamBlock(pTask);*/
/*}*/
return 0; return 0;
} }
......
...@@ -589,6 +589,8 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -589,6 +589,8 @@ int32_t streamTryExec(SStreamTask* pTask) {
int8_t schedStatus = int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
const char* id = pTask->id.idStr;
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask); int32_t code = streamExecForAll(pTask);
if (code < 0) { // todo this status shoudl be removed if (code < 0) { // todo this status shoudl be removed
...@@ -597,16 +599,43 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -597,16 +599,43 @@ int32_t streamTryExec(SStreamTask* pTask) {
} }
// todo the task should be commit here // todo the task should be commit here
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); if (taosQueueEmpty(pTask->inputQueue->queue)) {
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL &&
pTask->status.schedStatus); pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) {
// fill-history WAL scan has completed
streamTaskRecoverSetAllStepFinished(pTask);
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
if (!pTask->status.transferState) {
code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
pTask->status.transferState = true;
}
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && // the last execution of fill-history task, in order to transfer task operator states.
(!streamTaskShouldPause(&pTask->status))) { code = streamExecForAll(pTask);
streamSchedExec(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
}
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
} }
} else { } else {
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", pTask->id.idStr, qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
} }
......
...@@ -23,7 +23,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); ...@@ -23,7 +23,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1; pTask->status.downstreamReady = 1;
int64_t el = (taosGetTimestampMs() - pTask->initTs); int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
...@@ -663,7 +663,7 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { ...@@ -663,7 +663,7 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
// no input data yet. no need to execute the secondardy scan while stream task halt // no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask); streamTaskRecoverSetAllStepFinished(pTask);
qDebug( qDebug(
"s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during secondary scan",
pTask->id.idStr); pTask->id.idStr);
} else { } else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册