提交 64ef48e0 编写于 作者: K kailixu

Merge branch '3.0' into enh/TD-23769-3.0

......@@ -55,4 +55,4 @@ def taos_get_assignment_and_seek_demo():
if __name__ == '__main__':
taosws_get_assignment_and_seek_demo()
taos_get_assignment_and_seek_demo()
......@@ -459,10 +459,10 @@ typedef struct {
int64_t streamId;
int32_t taskId;
int32_t childId;
} SStreamRecoverFinishReq, SStreamTransferReq;
} SStreamScanHistoryFinishReq, SStreamTransferReq;
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
typedef struct {
int64_t streamId;
......@@ -537,8 +537,8 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskScanHistoryReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskScanHistoryReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
......@@ -578,15 +578,17 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask);
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
int32_t streamSetParamForScanHistory(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
......@@ -596,32 +598,29 @@ int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* p
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
// int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
// stream task meta
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint
......
......@@ -279,11 +279,11 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamRecoverFinishReq req;
SStreamScanHistoryFinishReq req;
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
tDecodeStreamRecoverFinishReq(&decoder, &req);
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
// find task
......@@ -292,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
return -1;
}
// do process request
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1;
}
......
......@@ -248,7 +248,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
......@@ -1071,13 +1071,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
// check param
int64_t fillVer1 = pTask->chkInfo.version;
if (fillVer1 <= 0) {
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
// do recovery step 1
const char* pId = pTask->id.idStr;
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
......@@ -1091,7 +1084,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
if (!streamTaskRecoverScanStep1Finished(pTask)) {
streamSourceScanHistoryData(pTask);
}
......@@ -1121,39 +1114,23 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// wait for the stream task get ready for scan history data
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms",
pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr);
tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId,
pTask->info.taskLevel, pId);
taosMsleep(100);
}
// now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId,
pStreamTask->info.taskLevel, pId);
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
ASSERT(latestVer >= pRange->maxVer);
int64_t nextStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask);
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", pId);
} else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
// [pTask->dataRange.range.maxVer, ver1]
pRange->minVer = nextStartVer;
pRange->maxVer = latestVer - 1;
}
streamHistoryTaskSetVerRangeStep2(pTask);
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
" do secondary scan-history-data after halt the related stream task:%s",
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs();
......@@ -1162,6 +1139,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (!streamTaskRecoverScanStep2Finished(pTask)) {
streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
streamMetaReleaseTask(pMeta, pTask);
......@@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
if (!pTask->status.transferState) {
code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1210,14 +1188,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug("s-task:%s no associated task, reset the time window:%" PRId64 " - %" PRId64, pId, pWindow->skey,
pWindow->ekey);
tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId,
pWindow->skey, pWindow->ekey);
} else {
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
", window:%" PRId64 " - %" PRId64,
pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
tqDebug(
"s-task:%s history data in current time window scan completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
}
// notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask);
......@@ -1238,7 +1218,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req);
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
......@@ -1251,61 +1231,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
streamTaskReleaseState(pTask);
tqDebug("s-task:%s receive state transfer req", pTask->id.idStr);
// related stream task load the state from the state storage backend
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
streamTaskReloadState(pStreamTask);
ASSERT(pTask->streamTaskId.taskId != 0);
pTask->status.transferState = true; // persistent data?
#if 0
// do check if current task handle all data in the input queue
int64_t st = taosGetTimestampMs();
tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
code = streamSourceRecoverScanStep2(pTask, sversion);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
}
qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
walReaderSeekVer(pTask->exec.pWalReader, sversion);
pTask->chkInfo.currentVer = sversion;
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// restore param
code = streamRestoreParam(pTask);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
}
// set status normal
tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
code = streamSetStatusNormal(pTask);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
if (pStreamTask == NULL) {
tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
return -1;
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
// dispatch recover finish req to all related downstream task
code = streamDispatchScanHistoryFinishMsg(pTask);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
}
streamTaskReloadState(pStreamTask);
atomic_store_8(&pTask->info.fillHistory, 0);
streamMetaSaveTask(pTq->pStreamMeta, pTask);
#endif
ASSERT(pTask->streamTaskId.taskId != 0);
pTask->status.transferState = true;
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
......@@ -1313,31 +1249,28 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
return 0;
}
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamRecoverFinishReq req;
SStreamScanHistoryFinishReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeStreamRecoverFinishReq(&decoder, &req);
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
// find task
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask == NULL) {
return -1;
}
// do process request
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId);
return -1;
}
int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
return code;
}
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
......@@ -1423,10 +1356,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
pTask->chkInfo.version);
streamProcessRunReq(pTask);
} else {
// if (streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// }
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
}
......@@ -1517,34 +1447,35 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
int32_t vgId = pTq->pStreamMeta->vgId;
if (pTask) {
if (streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version
if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (pTask == NULL) {
return -1;
}
if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamStartRecoverTask(pTask, igUntreated);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
tqStartStreamTasks(pTq);
} else {
streamSchedExec(pTask);
}
if (streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version
if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamStartRecoverTask(pTask, igUntreated);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
tqStartStreamTasks(pTq);
} else {
streamSchedExec(pTask);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} else {
return -1;
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
......@@ -1560,6 +1491,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
}
return code;
}
......
......@@ -665,7 +665,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len);
}
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
default:
......
......@@ -911,8 +911,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64
", window:%" PRId64 " - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
......@@ -999,31 +999,35 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
const char* id = GET_TASKID(pTaskInfo);
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
uint16_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
qInfo("%s restore stream agg executors param for interval: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
pInfo->twAggSup.deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
qInfo("%s restore stream agg executor param for session: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
pInfo->twAggSup.deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
qInfo("%s restore stream agg executor param for state: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
pInfo->twAggSup.deleteMark);
}
// iterate operator tree
......@@ -1037,7 +1041,6 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
pOperator = pOperator->pDownstream[0];
}
}
return 0;
}
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
......
......@@ -49,7 +49,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
#include "streamInt.h"
#include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
......
......@@ -16,7 +16,7 @@
#include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
#include "streamInc.h"
#include "streamInt.h"
#include "tcommon.h"
#include "tref.h"
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
#include "streamInt.h"
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInt.h"
#include "ttimer.h"
#include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
......@@ -276,14 +276,14 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
......@@ -299,7 +299,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
#include "streamInt.h"
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
......@@ -358,7 +358,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// here we need to wait for the stream task handle all data in the input queue.
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2. For a agg task
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
} else {
......@@ -369,21 +370,18 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// wait for the stream task to be idle
waitForTaskIdle(pTask, pStreamTask);
// In case of sink tasks, no need to be halted for them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
// When a task is idle with halt status, all data in inputQ are consumed.
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d",
", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
// todo transfer state
} else {
// for sink tasks, they are continue to execute, no need to be halt.
// the process should be stopped for a while, during the term of transfer task state.
// OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);
// todo transfer state
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
}
// expand the query time window for stream scanner
......@@ -401,81 +399,78 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
*/
int32_t streamExecForAll(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
const char* id) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
while (1) {
int32_t batchSize = 1;
int16_t times = 0;
SStreamQueueItem* pInput = NULL;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
if (streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
while (1) {
// downstream task's input queue is blocked, stop immediately
if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) ||
streamTaskShouldStop(&pTask->status)) {
if (batchSize > 1) {
break;
} else {
qDebug("123 %s", pTask->id.idStr);
return 0;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
continue;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
times++;
taosMsleep(10);
qDebug("===stream===try again batchSize:%d", batchSize);
continue;
}
qDebug("===stream===break batchSize:%d", *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
qDebug("===stream===break batchSize:%d", batchSize);
break;
}
// do not merge blocks for sink node
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
}
if (pInput == NULL) {
pInput = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
batchSize++;
pInput = newRet;
streamQueueProcessSuccess(pTask->inputQueue);
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
MAX_STREAM_EXEC_BATCH_NUM);
break;
}
}
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
*pInput = newRet;
}
if (streamTaskShouldStop(&pTask->status)) {
if (pInput) {
streamFreeQitem(pInput);
}
return 0;
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS;
}
}
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
*/
int32_t streamExecForAll(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
while (1) {
int32_t batchSize = 0;
SStreamQueueItem* pInput = NULL;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
if (pInput == NULL) {
ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask);
}
......@@ -534,8 +529,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, batchSize, el, resSize / 1048576.0, totalBlocks);
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, el, resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput);
}
......
......@@ -15,7 +15,7 @@
#include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInc.h"
#include "streamInt.h"
#include "tref.h"
#include "ttimer.h"
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
#include "streamInt.h"
SStreamQueue* streamQueueOpen(int64_t cap) {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
#include "streamInt.h"
#include "ttimer.h"
#include "wal.h"
......@@ -53,7 +53,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
streamSetParamForScanHistoryData(pTask);
streamSetParamForScanHistory(pTask);
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartRecoverTask(pTask, 0);
......@@ -72,8 +72,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
}
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask);
streamSetParamForScanHistoryData(pTask);
streamAggRecoverPrepare(pTask);
streamSetParamForScanHistory(pTask);
streamAggScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
streamSetStatusNormal(pTask);
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
......@@ -202,10 +202,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
pTask->checkReqIds = NULL;
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs,
streamGetTaskStatusStr(pTask->status.taskStatus));
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
} else {
} else { // todo add assert, agg tasks?
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
......@@ -215,7 +215,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
} else {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH);
if (pRsp->reqId != pTask->checkReqId) {
return -1;
}
......@@ -233,8 +234,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} else {
ASSERT(0);
}
} else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
......@@ -248,7 +247,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
}
// common
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) {
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
}
......@@ -286,7 +285,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
......@@ -314,7 +313,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
......@@ -330,7 +329,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
......@@ -375,7 +374,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
}
// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
pTask->numOfWaitingUpstream);
......@@ -391,19 +390,19 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
if (qStreamRecoverFinish(exec) < 0) {
return -1;
}
// streamSetStatusNormal(pTask);
return 0;
}
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
ASSERT(left >= 0);
if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks);
qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
pTask->id.idStr, numOfTasks);
streamAggUpstreamScanHistoryFinish(pTask);
} else {
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
......@@ -411,6 +410,7 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_
}
}
return 0;
}
......@@ -467,8 +467,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qWarn(
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or have been "
"destroyed, or should stop exec",
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
"destroyed, or should stop",
pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
......@@ -493,7 +493,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition
// an fill history task needs to be started.
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
......@@ -573,6 +573,27 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
return qStreamRecoverSetAllStepFinished(exec);
}
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
ASSERT(latestVer >= pRange->maxVer);
int64_t nextStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask);
qDebug(
"s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan",
pTask->id.idStr);
} else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
// [pTask->dataRange.range.maxVer, ver1]
pRange->minVer = nextStartVer;
pRange->maxVer = latestVer - 1;
}
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
......@@ -627,7 +648,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0;
}
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
......@@ -635,7 +656,7 @@ int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFi
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
......@@ -652,12 +673,12 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
// calculate the correct start time window, and start the handle the history data for the main task.
if (pTask->historyTaskId.taskId != 0) {
// check downstream tasks for associated scan-history-data tasks
streamCheckHistoryTaskDownstrem(pTask);
streamCheckHistoryTaskDownstream(pTask);
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer;
int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
......@@ -665,7 +686,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
} else {
......
......@@ -18,7 +18,7 @@
#include "osMemory.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "streamInc.h"
#include "streamInt.h"
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册