提交 7c6fbd77 编写于 作者: H Haojun Liao

enh(stream): do some internal refactor and support secondary scan for history data.

上级 11f0c3b3
......@@ -220,11 +220,11 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
void resetTaskInfo(qTaskInfo_t tinfo);
......
......@@ -451,7 +451,7 @@ typedef struct {
SMsgHead msgHead;
int64_t streamId;
int32_t taskId;
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
} SStreamScanHistoryReq, SStreamRecoverStep2Req;
typedef struct {
int64_t streamId;
......@@ -582,9 +582,9 @@ int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
// source level
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq);
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
......
......@@ -1066,10 +1066,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
char* msg = pMsg->pCont;
int32_t msgLen = pMsg->contLen;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) {
......@@ -1089,8 +1088,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));
int64_t st = taosGetTimestampMs();
// todo set the correct status flag
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING);
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
......@@ -1098,7 +1095,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
streamSourceRecoverScanStep1(pTask);
streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
......@@ -1124,21 +1121,47 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(100);
}
taosMsleep(10000);
// now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pTask->id.idStr);
// if it's an source task, extract the last version in wal.
int64_t ver = pTask->dataRange.range.maxVer;
int64_t ver = pTask->dataRange.range.maxVer + 1;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
if (latestVer >= ver) {
ver = latestVer;
} else {
ASSERT(latestVer == -1);
}
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
SVersionRange* pRange = &pTask->dataRange.range;
pRange->minVer = pRange->maxVer + 1;
pRange->maxVer = ver;
if (pRange->minVer == pRange->maxVer) {
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pTask->id.idStr);
} else {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
" do secondary scan-history-data after halt the related stream task:%s",
pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs();
streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);
streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el);
}
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
pTask->status.transferState = true;
......@@ -1177,7 +1200,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
return 0;
}
// notify the downstream tasks to transfer executor state after handle all history blocks.
......
......@@ -92,6 +92,7 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
qError("join not supported for stream block scan, %s" PRIx64, id);
return TSDB_CODE_APP_ERROR;
}
pOperator->status = OP_NOT_OPENED;
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
}
......@@ -869,12 +870,20 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.fillHistoryVer = *pVerRange;
pTaskInfo->streamInfo.fillHistoryWindow = *pWindow;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
qDebug("%s set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
}
......@@ -893,55 +902,58 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
return 0;
}
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
qInfo("save stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
qInfo("save stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
qInfo("save stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0);
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX;
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
}
......@@ -962,7 +974,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
return 0;
}
int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
......
......@@ -36,11 +36,11 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
pRange->minVer, pRange->maxVer);
streamSetParamForScanHistoryData(pTask);
streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window);
streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window);
SStreamRecoverStep1Req req;
SStreamScanHistoryReq req;
streamBuildSourceRecover1Req(pTask, &req);
int32_t len = sizeof(SStreamRecoverStep1Req);
int32_t len = sizeof(SStreamScanHistoryReq);
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
......@@ -242,13 +242,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
// common
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamSetParamForRecover(exec);
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
}
int32_t streamRestoreParam(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRestoreParam(exec);
qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr);
return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
}
int32_t streamSetStatusNormal(SStreamTask* pTask) {
......@@ -258,19 +258,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
// source
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
void* exec = pTask->exec.pExecutor;
return qStreamSourceRecoverStep1(exec, pVerRange, pWindow);
int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow);
}
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq) {
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId;
return 0;
}
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
return streamScanExec(pTask, 100);
}
......@@ -393,7 +392,7 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
if (qStreamRestoreParam(exec) < 0) {
if (qRestoreStreamOperatorOption(exec) < 0) {
return -1;
}
if (qStreamRecoverFinish(exec) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册