diff --git a/docs/examples/python/tmq_assignment_example.py b/docs/examples/python/tmq_assignment_example.py
index a07347a9b9523f2040895c2973e27ccf31799000..41737e3fc498c785b6bd0e23b79f80c62f364476 100644
--- a/docs/examples/python/tmq_assignment_example.py
+++ b/docs/examples/python/tmq_assignment_example.py
@@ -55,4 +55,4 @@ def taos_get_assignment_and_seek_demo():
if __name__ == '__main__':
- taosws_get_assignment_and_seek_demo()
+ taos_get_assignment_and_seek_demo()
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index dbcc31a35ebf2a0f7595ebe26737fe143a2a5a0a..b48992b5f22914455c5a41f8bf46eb7c0b27f5cb 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -459,10 +459,10 @@ typedef struct {
int64_t streamId;
int32_t taskId;
int32_t childId;
-} SStreamRecoverFinishReq, SStreamTransferReq;
+} SStreamScanHistoryFinishReq, SStreamTransferReq;
-int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
-int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
+int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
+int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
typedef struct {
int64_t streamId;
@@ -537,8 +537,8 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
-int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
-int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
+int32_t tEncodeSStreamTaskScanHistoryReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
+int32_t tDecodeSStreamTaskScanHistoryReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
@@ -578,15 +578,17 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
-int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask);
+int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
+void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
+
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common
-int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
+int32_t streamSetParamForScanHistory(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
@@ -596,32 +598,29 @@ int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* p
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
-// int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level
-int32_t streamAggRecoverPrepare(SStreamTask* pTask);
-int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
+int32_t streamAggScanHistoryPrepare(SStreamTask* pTask);
+int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
+// stream task meta
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
-
-int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
-int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
-int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
-int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta);
-
+int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
+int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
+int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
+int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
-int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint
diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c
index 3d9adf815693dd969ab7fd7b2bc7da466e192ba8..e4bc184be32fcb2a5bfd45768d27322e4799e5da 100644
--- a/source/dnode/snode/src/snode.c
+++ b/source/dnode/snode/src/snode.c
@@ -279,11 +279,11 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
- SStreamRecoverFinishReq req;
+ SStreamScanHistoryFinishReq req;
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
- tDecodeStreamRecoverFinishReq(&decoder, &req);
+ tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
// find task
@@ -292,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
return -1;
}
// do process request
- if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
+ if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1;
}
diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h
index 5c08293591ea8c9fca46d83a0165a26bda886566..318af2a2eeec068d83f86cabf96072658a71ea3c 100644
--- a/source/dnode/vnode/src/inc/vnodeInt.h
+++ b/source/dnode/vnode/src/inc/vnodeInt.h
@@ -248,7 +248,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
-int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
+int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index cd4f8795d5ad244db7d8f3d9f45cff5303282aec..20061911bc4daa676c24fa26b868e4920b447d12 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -1071,13 +1071,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
- // check param
- int64_t fillVer1 = pTask->chkInfo.version;
- if (fillVer1 <= 0) {
- streamMetaReleaseTask(pMeta, pTask);
- return -1;
- }
-
// do recovery step 1
const char* pId = pTask->id.idStr;
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
@@ -1091,7 +1084,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
- if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
+ if (!streamTaskRecoverScanStep1Finished(pTask)) {
streamSourceScanHistoryData(pTask);
}
@@ -1121,39 +1114,23 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// wait for the stream task get ready for scan history data
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
- tqDebug(
- "s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms",
- pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr);
+ tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId,
+ pTask->info.taskLevel, pId);
taosMsleep(100);
}
// now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
- tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
+ tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId,
pStreamTask->info.taskLevel, pId);
// if it's an source task, extract the last version in wal.
- pRange = &pTask->dataRange.range;
- int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
- ASSERT(latestVer >= pRange->maxVer);
-
- int64_t nextStartVer = pRange->maxVer + 1;
- if (nextStartVer > latestVer - 1) {
- // no input data yet. no need to execute the secondardy scan while stream task halt
- streamTaskRecoverSetAllStepFinished(pTask);
- tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", pId);
- } else {
- // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
- // [pTask->dataRange.range.maxVer, ver1]
- pRange->minVer = nextStartVer;
- pRange->maxVer = latestVer - 1;
- }
+ streamHistoryTaskSetVerRangeStep2(pTask);
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
- tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
- " do secondary scan-history-data after halt the related stream task:%s",
- pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
+ tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
+ pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs();
@@ -1162,6 +1139,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (!streamTaskRecoverScanStep2Finished(pTask)) {
streamSourceScanHistoryData(pTask);
+
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
streamMetaReleaseTask(pMeta, pTask);
@@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
- // 3. notify the downstream tasks to transfer executor state after handle all history blocks.
+ // 3. notify downstream tasks to transfer executor state after handle all history blocks.
if (!pTask->status.transferState) {
code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) {
@@ -1210,14 +1188,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
- tqDebug("s-task:%s no associated task, reset the time window:%" PRId64 " - %" PRId64, pId, pWindow->skey,
- pWindow->ekey);
+ tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId,
+ pWindow->skey, pWindow->ekey);
} else {
- tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
- ", window:%" PRId64 " - %" PRId64,
- pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
+ tqDebug(
+ "s-task:%s history data in current time window scan completed, now start to handle data from WAL, start "
+ "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
+ pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
}
+ // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask);
@@ -1238,7 +1218,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
- int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req);
+ int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
@@ -1251,61 +1231,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
streamTaskReleaseState(pTask);
tqDebug("s-task:%s receive state transfer req", pTask->id.idStr);
+ // related stream task load the state from the state storage backend
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
- streamTaskReloadState(pStreamTask);
-
- ASSERT(pTask->streamTaskId.taskId != 0);
- pTask->status.transferState = true; // persistent data?
-
-#if 0
- // do check if current task handle all data in the input queue
- int64_t st = taosGetTimestampMs();
- tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
-
- code = streamSourceRecoverScanStep2(pTask, sversion);
- if (code < 0) {
- streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- return -1;
- }
-
- qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
-
- walReaderSeekVer(pTask->exec.pWalReader, sversion);
- pTask->chkInfo.currentVer = sversion;
-
- if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
- streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- return 0;
- }
-
- // restore param
- code = streamRestoreParam(pTask);
- if (code < 0) {
- streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- return -1;
- }
-
- // set status normal
- tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
- code = streamSetStatusNormal(pTask);
- if (code < 0) {
- streamMetaReleaseTask(pTq->pStreamMeta, pTask);
+ if (pStreamTask == NULL) {
+ tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
return -1;
}
- double el = (taosGetTimestampMs() - st) / 1000.0;
- tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
-
- // dispatch recover finish req to all related downstream task
- code = streamDispatchScanHistoryFinishMsg(pTask);
- if (code < 0) {
- streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- return -1;
- }
+ streamTaskReloadState(pStreamTask);
- atomic_store_8(&pTask->info.fillHistory, 0);
- streamMetaSaveTask(pTq->pStreamMeta, pTask);
-#endif
+ ASSERT(pTask->streamTaskId.taskId != 0);
+ pTask->status.transferState = true;
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
@@ -1313,31 +1249,28 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
return 0;
}
-int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
+int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
- SStreamRecoverFinishReq req;
+ SStreamScanHistoryFinishReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
- tDecodeStreamRecoverFinishReq(&decoder, &req);
+ tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
// find task
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask == NULL) {
- return -1;
- }
- // do process request
- if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
- streamMetaReleaseTask(pTq->pStreamMeta, pTask);
+ tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId);
return -1;
}
+ int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- return 0;
+ return code;
}
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
@@ -1423,10 +1356,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
pTask->chkInfo.version);
streamProcessRunReq(pTask);
} else {
-// if (streamTaskShouldPause(&pTask->status)) {
- atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
-// }
-
+ atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
}
@@ -1517,34 +1447,35 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
int32_t vgId = pTq->pStreamMeta->vgId;
- if (pTask) {
- if (streamTaskShouldPause(&pTask->status)) {
- atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
-
- // no lock needs to secure the access of the version
- if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
- // discard all the data when the stream task is suspended.
- walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
- tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
- ", schedStatus:%d",
- vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
- } else { // from the previous paused version and go on
- tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
- vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
- }
+ if (pTask == NULL) {
+ return -1;
+ }
- if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
- streamStartRecoverTask(pTask, igUntreated);
- } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
- tqStartStreamTasks(pTq);
- } else {
- streamSchedExec(pTask);
- }
+ if (streamTaskShouldPause(&pTask->status)) {
+ atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
+
+ // no lock needs to secure the access of the version
+ if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
+ // discard all the data when the stream task is suspended.
+ walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
+ tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
+ ", schedStatus:%d",
+ vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
+ } else { // from the previous paused version and go on
+ tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
+ vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
+ }
+
+ if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
+ streamStartRecoverTask(pTask, igUntreated);
+ } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
+ tqStartStreamTasks(pTq);
+ } else {
+ streamSchedExec(pTask);
}
- streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- } else {
- return -1;
}
+
+ streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
@@ -1560,6 +1491,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
}
+
return code;
}
diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c
index 40bca578278328188b5a009bbf568988956c5fcf..88a260b3a37f706e8a09820aa6ab56cc56780a8b 100644
--- a/source/dnode/vnode/src/vnd/vnodeSvr.c
+++ b/source/dnode/vnode/src/vnd/vnodeSvr.c
@@ -665,7 +665,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len);
}
case TDMT_STREAM_SCAN_HISTORY_FINISH:
- return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
+ return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
default:
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index a3d94a0891e64ed2c2b0f165e0256a915279be0b..900505acb32fd5256a9481b96c9d800387d37138 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -911,8 +911,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;
- qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
- " - %" PRId64,
+ qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64
+ ", window:%" PRId64 " - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
@@ -999,31 +999,35 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
+ const char* id = GET_TASKID(pTaskInfo);
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
- if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
- pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
- pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
+ uint16_t type = pOperator->operatorType;
+ if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
+ type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
- qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
- } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
- pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
- pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
+ qInfo("%s restore stream agg executors param for interval: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
+ pInfo->twAggSup.deleteMark);
+ } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
+ type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
+ type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
- qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
- } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
+ qInfo("%s restore stream agg executor param for session: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
+ pInfo->twAggSup.deleteMark);
+ } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
- qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
+ qInfo("%s restore stream agg executor param for state: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
+ pInfo->twAggSup.deleteMark);
}
// iterate operator tree
@@ -1037,7 +1041,6 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
pOperator = pOperator->pDownstream[0];
}
}
- return 0;
}
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInt.h
similarity index 97%
rename from source/libs/stream/inc/streamInc.h
rename to source/libs/stream/inc/streamInt.h
index eec37d7dbb6cfaa4e7e3e9cd197c297256593657..2164b63cafa6a0083af1e6552f209f5614f2fb5c 100644
--- a/source/libs/stream/inc/streamInc.h
+++ b/source/libs/stream/inc/streamInt.h
@@ -49,7 +49,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
-int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
+int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c
index 691d31e64cd8061f5a4a04507864501078ebc4f5..ddbc8da3ecc8cc920157f3a8a356850a5c917da0 100644
--- a/source/libs/stream/src/stream.c
+++ b/source/libs/stream/src/stream.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "streamInc.h"
+#include "streamInt.h"
#include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index 4646af641f4ebacbea9c0a7a0452c9ac55ed6ac9..18ec80e87a2777bb53967c8a23bfa1036586c74b 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -16,7 +16,7 @@
#include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
-#include "streamInc.h"
+#include "streamInt.h"
#include "tcommon.h"
#include "tref.h"
diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c
index 37923ca80720e314afe2b272629c1409c2341342..bad104bc8ee47ced42ff150ea7affa552047d187 100644
--- a/source/libs/stream/src/streamData.c
+++ b/source/libs/stream/src/streamData.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "streamInc.h"
+#include "streamInt.h"
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index d93de7b1e5d3b0811463643b512c4c8cf9aa341d..9241df2e707f8d74fe2798640c3fcd29f3e9ac71 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
+#include "streamInt.h"
#include "ttimer.h"
-#include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
@@ -276,14 +276,14 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
-int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
+int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
- tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
+ tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
@@ -299,7 +299,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
- if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
+ if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 6e1804b08ed638c1a8fdb5fcf005162e2188a066..bcb479e71ec50ac73f8df9008a7dae7748d223ca 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "streamInc.h"
+#include "streamInt.h"
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
@@ -358,7 +358,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
- // here we need to wait for the stream task handle all data in the input queue.
+ // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
+ // for the step 2. For a agg task
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
} else {
@@ -369,21 +370,18 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// wait for the stream task to be idle
waitForTaskIdle(pTask, pStreamTask);
+ // In case of sink tasks, no need to be halted for them.
+ // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
+ // start the task state transfer procedure.
+ // When a task is idle with halt status, all data in inputQ are consumed.
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
- ", status:%s, sched-status:%d",
+ ", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
-
- // todo transfer state
} else {
- // for sink tasks, they are continue to execute, no need to be halt.
- // the process should be stopped for a while, during the term of transfer task state.
- // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
- qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);
-
- // todo transfer state
+ qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
}
// expand the query time window for stream scanner
@@ -401,81 +399,78 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
-/**
- * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
- * appropriate batch of blocks should be handled in 5 to 10 sec.
- */
-int32_t streamExecForAll(SStreamTask* pTask) {
- const char* id = pTask->id.idStr;
+static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
+ const char* id) {
+ int32_t retryTimes = 0;
+ int32_t MAX_RETRY_TIMES = 5;
while (1) {
- int32_t batchSize = 1;
- int16_t times = 0;
-
- SStreamQueueItem* pInput = NULL;
-
- // merge multiple input data if possible in the input queue.
- qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
+ if (streamTaskShouldPause(&pTask->status)) {
+ qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
+ return TSDB_CODE_SUCCESS;
+ }
- while (1) {
- // downstream task's input queue is blocked, stop immediately
- if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) ||
- streamTaskShouldStop(&pTask->status)) {
- if (batchSize > 1) {
- break;
- } else {
- qDebug("123 %s", pTask->id.idStr);
- return 0;
- }
+ SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
+ if (qItem == NULL) {
+ if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
+ taosMsleep(10);
+ qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
+ continue;
}
- SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
- if (qItem == NULL) {
- if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
- times++;
- taosMsleep(10);
- qDebug("===stream===try again batchSize:%d", batchSize);
- continue;
- }
+ qDebug("===stream===break batchSize:%d", *numOfBlocks);
+ return TSDB_CODE_SUCCESS;
+ }
- qDebug("===stream===break batchSize:%d", batchSize);
- break;
- }
+ // do not merge blocks for sink node
+ if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
+ *numOfBlocks = 1;
+ *pInput = qItem;
+ return TSDB_CODE_SUCCESS;
+ }
- if (pInput == NULL) {
- pInput = qItem;
- streamQueueProcessSuccess(pTask->inputQueue);
- if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
- break;
- }
- } else {
- // todo we need to sort the data block, instead of just appending into the array list.
- void* newRet = NULL;
- if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
- streamQueueProcessFail(pTask->inputQueue);
- break;
- } else {
- batchSize++;
- pInput = newRet;
- streamQueueProcessSuccess(pTask->inputQueue);
-
- if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
- qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
- MAX_STREAM_EXEC_BATCH_NUM);
- break;
- }
- }
+ if (*pInput == NULL) {
+ ASSERT((*numOfBlocks) == 0);
+ *pInput = qItem;
+ } else {
+ // todo we need to sort the data block, instead of just appending into the array list.
+ void* newRet = streamMergeQueueItem(*pInput, qItem);
+ if (newRet == NULL) {
+ qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
+ streamQueueProcessFail(pTask->inputQueue);
+ return TSDB_CODE_SUCCESS;
}
+
+ *pInput = newRet;
}
- if (streamTaskShouldStop(&pTask->status)) {
- if (pInput) {
- streamFreeQitem(pInput);
- }
- return 0;
+ *numOfBlocks += 1;
+ streamQueueProcessSuccess(pTask->inputQueue);
+
+ if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
+ qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
+ return TSDB_CODE_SUCCESS;
}
+ }
+}
+
+/**
+ * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
+ * appropriate batch of blocks should be handled in 5 to 10 sec.
+ */
+int32_t streamExecForAll(SStreamTask* pTask) {
+ const char* id = pTask->id.idStr;
+
+ while (1) {
+ int32_t batchSize = 0;
+ SStreamQueueItem* pInput = NULL;
+
+ // merge multiple input data if possible in the input queue.
+ qDebug("s-task:%s start to extract data block from inputQ", id);
+ /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
if (pInput == NULL) {
+ ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask);
}
@@ -534,8 +529,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
- qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
- id, batchSize, el, resSize / 1048576.0, totalBlocks);
+ qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
+ id, el, resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput);
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 8242f84312a073fd0d83a3e1774773bac8ec0ac6..e1f625dd52535fcbf4116ea6adcfd06b3625fa09 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -15,7 +15,7 @@
#include "executor.h"
#include "streamBackendRocksdb.h"
-#include "streamInc.h"
+#include "streamInt.h"
#include "tref.h"
#include "ttimer.h"
diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c
index 4cfeedab57fb289d12643e996f0b4cdd3f65c531..aaf9fdec724cf6765232cb8a77adfb36754ce6d9 100644
--- a/source/libs/stream/src/streamQueue.c
+++ b/source/libs/stream/src/streamQueue.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "streamInc.h"
+#include "streamInt.h"
SStreamQueue* streamQueueOpen(int64_t cap) {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index 9ded58597ff3233098dfc2e213adc246d636903c..a3fc3418aa8f78c14e18ea26184e0f5a14d5bb72 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "streamInc.h"
+#include "streamInt.h"
#include "ttimer.h"
#include "wal.h"
@@ -53,7 +53,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
- streamSetParamForScanHistoryData(pTask);
+ streamSetParamForScanHistory(pTask);
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartRecoverTask(pTask, 0);
@@ -72,8 +72,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
}
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask);
- streamSetParamForScanHistoryData(pTask);
- streamAggRecoverPrepare(pTask);
+ streamSetParamForScanHistory(pTask);
+ streamAggScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
streamSetStatusNormal(pTask);
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
@@ -202,10 +202,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
pTask->checkReqIds = NULL;
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
- qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs,
- streamGetTaskStatusStr(pTask->status.taskStatus));
+ qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
+ numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
- } else {
+ } else { // todo add assert, agg tasks?
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
@@ -215,7 +215,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
- } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
+ } else {
+ ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH);
if (pRsp->reqId != pTask->checkReqId) {
return -1;
}
@@ -233,8 +234,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
- } else {
- ASSERT(0);
}
} else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
@@ -248,7 +247,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
}
// common
-int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) {
+int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
}
@@ -286,7 +285,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
- SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
+ SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
@@ -314,7 +313,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
SRpcMsg msg = {0};
int32_t tlen;
- tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
+ tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
@@ -330,7 +329,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
- if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
+ if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
@@ -375,7 +374,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
}
// agg
-int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
+int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
pTask->numOfWaitingUpstream);
@@ -391,19 +390,19 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
if (qStreamRecoverFinish(exec) < 0) {
return -1;
}
-
-// streamSetStatusNormal(pTask);
return 0;
}
-int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
+int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
ASSERT(left >= 0);
if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
- qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks);
+ qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
+ pTask->id.idStr, numOfTasks);
+
streamAggUpstreamScanHistoryFinish(pTask);
} else {
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
@@ -411,6 +410,7 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_
}
}
+
return 0;
}
@@ -467,8 +467,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qWarn(
- "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or have been "
- "destroyed, or should stop exec",
+ "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
+ "destroyed, or should stop",
pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
@@ -493,7 +493,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition
// an fill history task needs to be started.
-int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
+int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
@@ -573,6 +573,27 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
return qStreamRecoverSetAllStepFinished(exec);
}
+void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) {
+ SVersionRange* pRange = &pTask->dataRange.range;
+ int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
+ ASSERT(latestVer >= pRange->maxVer);
+
+ int64_t nextStartVer = pRange->maxVer + 1;
+ if (nextStartVer > latestVer - 1) {
+ // no input data yet. no need to execute the secondardy scan while stream task halt
+ streamTaskRecoverSetAllStepFinished(pTask);
+ qDebug(
+ "s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan",
+ pTask->id.idStr);
+ } else {
+ // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
+ // [pTask->dataRange.range.maxVer, ver1]
+ pRange->minVer = nextStartVer;
+ pRange->maxVer = latestVer - 1;
+ }
+}
+
+
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
@@ -627,7 +648,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0;
}
-int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
+int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
@@ -635,7 +656,7 @@ int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFi
tEndEncode(pEncoder);
return pEncoder->pos;
}
-int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
+int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
@@ -652,12 +673,12 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
// calculate the correct start time window, and start the handle the history data for the main task.
if (pTask->historyTaskId.taskId != 0) {
// check downstream tasks for associated scan-history-data tasks
- streamCheckHistoryTaskDownstrem(pTask);
+ streamCheckHistoryTaskDownstream(pTask);
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
- int64_t ekey = pRange->window.ekey + 1;
- int64_t ver = pRange->range.minVer;
+ int64_t ekey = pRange->window.ekey + 1;
+ int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
@@ -665,7 +686,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
- ", ver range:%" PRId64 " - %" PRId64,
+ ", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
} else {
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
index 9873e7b4c82e81350deb9a1d198d6c7a016ffc1c..0a4f73a67c8afb319aa4da90b5f0253368d85dee 100644
--- a/source/libs/stream/src/streamState.c
+++ b/source/libs/stream/src/streamState.c
@@ -18,7 +18,7 @@
#include "osMemory.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
-#include "streamInc.h"
+#include "streamInt.h"
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"