提交 03dbcbf0 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 ff2bf356
...@@ -459,10 +459,10 @@ typedef struct { ...@@ -459,10 +459,10 @@ typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t childId; int32_t childId;
} SStreamRecoverFinishReq, SStreamTransferReq; } SStreamScanHistoryFinishReq, SStreamTransferReq;
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq); int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq); int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
...@@ -537,8 +537,8 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) ...@@ -537,8 +537,8 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq); int32_t tEncodeSStreamTaskScanHistoryReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq); int32_t tDecodeSStreamTaskScanHistoryReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp); int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp); int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
...@@ -578,15 +578,17 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); ...@@ -578,15 +578,17 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask); int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask); int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common // common
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask); int32_t streamSetParamForScanHistory(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status); const char* streamGetTaskStatusStr(int32_t status);
...@@ -596,32 +598,29 @@ int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* p ...@@ -596,32 +598,29 @@ int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* p
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamSourceScanHistoryData(SStreamTask* pTask);
// int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level // agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask); int32_t streamAggScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
// stream task meta
void streamMetaInit(); void streamMetaInit();
void streamMetaCleanup(); void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint // checkpoint
......
...@@ -279,11 +279,11 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -279,11 +279,11 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize // deserialize
SStreamRecoverFinishReq req; SStreamScanHistoryFinishReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, msg, msgLen);
tDecodeStreamRecoverFinishReq(&decoder, &req); tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
...@@ -292,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -292,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
return -1; return -1;
} }
// do process request // do process request
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) { if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1; return -1;
} }
......
...@@ -248,7 +248,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); ...@@ -248,7 +248,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version); int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
...@@ -1071,13 +1071,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1071,13 +1071,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
// check param
int64_t fillVer1 = pTask->chkInfo.version;
if (fillVer1 <= 0) {
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
// do recovery step 1 // do recovery step 1
const char* pId = pTask->id.idStr; const char* pId = pTask->id.idStr;
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId, tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
...@@ -1091,7 +1084,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1091,7 +1084,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { if (!streamTaskRecoverScanStep1Finished(pTask)) {
streamSourceScanHistoryData(pTask); streamSourceScanHistoryData(pTask);
} }
...@@ -1121,39 +1114,23 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1121,39 +1114,23 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// wait for the stream task get ready for scan history data // wait for the stream task get ready for scan history data
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug( tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId,
"s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms", pTask->info.taskLevel, pId);
pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr);
taosMsleep(100); taosMsleep(100);
} }
// now we can stop the stream task execution // now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId,
pStreamTask->info.taskLevel, pId); pStreamTask->info.taskLevel, pId);
// if it's an source task, extract the last version in wal. // if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range; streamHistoryTaskSetVerRangeStep2(pTask);
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
ASSERT(latestVer >= pRange->maxVer);
int64_t nextStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask);
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", pId);
} else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
// [pTask->dataRange.range.maxVer, ver1]
pRange->minVer = nextStartVer;
pRange->maxVer = latestVer - 1;
}
} }
if (!streamTaskRecoverScanStep1Finished(pTask)) { if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
" do secondary scan-history-data after halt the related stream task:%s", pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId);
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs(); st = taosGetTimestampMs();
...@@ -1162,6 +1139,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1162,6 +1139,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (!streamTaskRecoverScanStep2Finished(pTask)) { if (!streamTaskRecoverScanStep2Finished(pTask)) {
streamSourceScanHistoryData(pTask); streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
...@@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
el = (taosGetTimestampMs() - st) / 1000.0; el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el); tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
// 3. notify the downstream tasks to transfer executor state after handle all history blocks. // 3. notify downstream tasks to transfer executor state after handle all history blocks.
if (!pTask->status.transferState) { if (!pTask->status.transferState) {
code = streamDispatchTransferStateMsg(pTask); code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1210,14 +1188,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1210,14 +1188,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug("s-task:%s no associated task, reset the time window:%" PRId64 " - %" PRId64, pId, pWindow->skey, tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId,
pWindow->ekey); pWindow->skey, pWindow->ekey);
} else { } else {
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64 tqDebug(
", window:%" PRId64 " - %" PRId64, "s-task:%s history data in current time window scan completed, now start to handle data from WAL, start "
pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
} }
// notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
code = streamTaskScanHistoryDataComplete(pTask); code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
...@@ -1238,7 +1218,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int ...@@ -1238,7 +1218,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req); int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
...@@ -1251,61 +1231,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int ...@@ -1251,61 +1231,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
tqDebug("s-task:%s receive state transfer req", pTask->id.idStr); tqDebug("s-task:%s receive state transfer req", pTask->id.idStr);
// related stream task load the state from the state storage backend
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
streamTaskReloadState(pStreamTask); if (pStreamTask == NULL) {
tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
ASSERT(pTask->streamTaskId.taskId != 0);
pTask->status.transferState = true; // persistent data?
#if 0
// do check if current task handle all data in the input queue
int64_t st = taosGetTimestampMs();
tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
code = streamSourceRecoverScanStep2(pTask, sversion);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
}
qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
walReaderSeekVer(pTask->exec.pWalReader, sversion);
pTask->chkInfo.currentVer = sversion;
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// restore param
code = streamRestoreParam(pTask);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
}
// set status normal
tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr);
code = streamSetStatusNormal(pTask);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
double el = (taosGetTimestampMs() - st) / 1000.0; streamTaskReloadState(pStreamTask);
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
// dispatch recover finish req to all related downstream task
code = streamDispatchScanHistoryFinishMsg(pTask);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
}
atomic_store_8(&pTask->info.fillHistory, 0); ASSERT(pTask->streamTaskId.taskId != 0);
streamMetaSaveTask(pTq->pStreamMeta, pTask); pTask->status.transferState = true;
#endif
streamSchedExec(pTask); streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
...@@ -1313,31 +1249,28 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int ...@@ -1313,31 +1249,28 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
return 0; return 0;
} }
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize // deserialize
SStreamRecoverFinishReq req; SStreamScanHistoryFinishReq req = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeStreamRecoverFinishReq(&decoder, &req); tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId);
}
// do process request
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return code;
} }
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
...@@ -1423,10 +1356,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1423,10 +1356,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
} else { } else {
// if (streamTaskShouldPause(&pTask->status)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// }
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
} }
...@@ -1517,34 +1447,35 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1517,34 +1447,35 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) { int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
int32_t vgId = pTq->pStreamMeta->vgId; int32_t vgId = pTq->pStreamMeta->vgId;
if (pTask) { if (pTask == NULL) {
if (streamTaskShouldPause(&pTask->status)) { return -1;
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); }
// no lock needs to secure the access of the version
if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (streamTaskShouldPause(&pTask->status)) {
streamStartRecoverTask(pTask, igUntreated); atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
tqStartStreamTasks(pTq); // no lock needs to secure the access of the version
} else { if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
streamSchedExec(pTask); // discard all the data when the stream task is suspended.
} walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamStartRecoverTask(pTask, igUntreated);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
tqStartStreamTasks(pTq);
} else {
streamSchedExec(pTask);
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} else {
return -1;
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
...@@ -1560,6 +1491,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1560,6 +1491,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
if (pHistoryTask) { if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
} }
return code; return code;
} }
......
...@@ -665,7 +665,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) ...@@ -665,7 +665,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len); return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len);
} }
case TDMT_STREAM_SCAN_HISTORY_FINISH: case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
default: default:
......
...@@ -911,8 +911,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan ...@@ -911,8 +911,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->recoverStep1Finished = true; pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false; pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64
" - %" PRId64, ", window:%" PRId64 " - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey); pWindow->ekey);
return 0; return 0;
...@@ -999,31 +999,35 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { ...@@ -999,31 +999,35 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
const char* id = GET_TASKID(pTaskInfo);
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) { while (1) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || uint16_t type = pOperator->operatorType;
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("%s restore stream agg executors param for interval: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || pInfo->twAggSup.deleteMark);
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("%s restore stream agg executor param for session: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { pInfo->twAggSup.deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("%s restore stream agg executor param for state: %d, %" PRId64, id, pInfo->twAggSup.calTrigger,
pInfo->twAggSup.deleteMark);
} }
// iterate operator tree // iterate operator tree
...@@ -1037,7 +1041,6 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { ...@@ -1037,7 +1041,6 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
pOperator = pOperator->pDownstream[0]; pOperator = pOperator->pDownstream[0];
} }
} }
return 0;
} }
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
......
...@@ -49,7 +49,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p ...@@ -49,7 +49,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet); SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInc.h" #include "streamInt.h"
#include "ttimer.h" #include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "executor.h" #include "executor.h"
#include "query.h" #include "query.h"
#include "streamInc.h" #include "streamInt.h"
#include "tcommon.h" #include "tcommon.h"
#include "tref.h" #include "tref.h"
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInc.h" #include "streamInt.h"
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInt.h"
#include "ttimer.h" #include "ttimer.h"
#include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024 #define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300 #define DISPATCH_RETRY_INTERVAL_MS 300
...@@ -276,14 +276,14 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR ...@@ -276,14 +276,14 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0; return 0;
} }
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) { SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
SRpcMsg msg = {0}; SRpcMsg msg = {0};
int32_t tlen; int32_t tlen;
tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code); tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
...@@ -299,7 +299,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe ...@@ -299,7 +299,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) { if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) { if (buf) {
rpcFreeCont(buf); rpcFreeCont(buf);
} }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInc.h" #include "streamInt.h"
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
...@@ -358,7 +358,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -358,7 +358,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// here we need to wait for the stream task handle all data in the input queue. // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2. For a agg task
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
} else { } else {
...@@ -369,21 +370,18 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -369,21 +370,18 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// wait for the stream task to be idle // wait for the stream task to be idle
waitForTaskIdle(pTask, pStreamTask); waitForTaskIdle(pTask, pStreamTask);
// In case of sink tasks, no need to be halted for them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
// When a task is idle with halt status, all data in inputQ are consumed.
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task. // update the scan data range for source task.
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d", ", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
// todo transfer state
} else { } else {
// for sink tasks, they are continue to execute, no need to be halt. qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
// the process should be stopped for a while, during the term of transfer task state.
// OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);
// todo transfer state
} }
// expand the query time window for stream scanner // expand the query time window for stream scanner
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "executor.h" #include "executor.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInc.h" #include "streamInt.h"
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInc.h" #include "streamInt.h"
SStreamQueue* streamQueueOpen(int64_t cap) { SStreamQueue* streamQueueOpen(int64_t cap) {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInc.h" #include "streamInt.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
...@@ -53,7 +53,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { ...@@ -53,7 +53,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
streamSetParamForScanHistoryData(pTask); streamSetParamForScanHistory(pTask);
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartRecoverTask(pTask, 0); int32_t code = streamStartRecoverTask(pTask, 0);
...@@ -72,8 +72,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { ...@@ -72,8 +72,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
} }
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
streamSetParamForScanHistoryData(pTask); streamSetParamForScanHistory(pTask);
streamAggRecoverPrepare(pTask); streamAggScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr); qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
...@@ -202,10 +202,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -202,10 +202,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs, qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus)); numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask); streamTaskLaunchScanHistory(pTask);
} else { } else { // todo add assert, agg tasks?
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus)); streamGetTaskStatusStr(pTask->status.taskStatus));
...@@ -215,7 +215,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -215,7 +215,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
} }
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH);
if (pRsp->reqId != pTask->checkReqId) { if (pRsp->reqId != pTask->checkReqId) {
return -1; return -1;
} }
...@@ -233,8 +234,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -233,8 +234,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id, qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus)); streamGetTaskStatusStr(pTask->status.taskStatus));
} }
} else {
ASSERT(0);
} }
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId, qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
...@@ -248,7 +247,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -248,7 +247,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} }
// common // common
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) { int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
} }
...@@ -286,7 +285,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { ...@@ -286,7 +285,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
} }
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
// serialize // serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
...@@ -314,7 +313,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe ...@@ -314,7 +313,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
SRpcMsg msg = {0}; SRpcMsg msg = {0};
int32_t tlen; int32_t tlen;
tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code); tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
...@@ -330,7 +329,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe ...@@ -330,7 +329,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) { if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) { if (buf) {
rpcFreeCont(buf); rpcFreeCont(buf);
} }
...@@ -375,7 +374,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { ...@@ -375,7 +374,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
} }
// agg // agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) { int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr, qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
pTask->numOfWaitingUpstream); pTask->numOfWaitingUpstream);
...@@ -391,19 +390,19 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { ...@@ -391,19 +390,19 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
if (qStreamRecoverFinish(exec) < 0) { if (qStreamRecoverFinish(exec) < 0) {
return -1; return -1;
} }
// streamSetStatusNormal(pTask);
return 0; return 0;
} }
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) { int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
if (pTask->info.taskLevel == TASK_LEVEL__AGG) { if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks); qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
pTask->id.idStr, numOfTasks);
streamAggUpstreamScanHistoryFinish(pTask); streamAggUpstreamScanHistoryFinish(pTask);
} else { } else {
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
...@@ -411,6 +410,7 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_ ...@@ -411,6 +410,7 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_
} }
} }
return 0; return 0;
} }
...@@ -467,8 +467,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ...@@ -467,8 +467,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qWarn( qWarn(
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or have been " "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been "
"destroyed, or should stop exec", "destroyed, or should stop",
pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId); pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
...@@ -493,7 +493,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ...@@ -493,7 +493,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition // todo fix the bug: 2. race condition
// an fill history task needs to be started. // an fill history task needs to be started.
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) { int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId; int32_t hTaskId = pTask->historyTaskId.taskId;
...@@ -573,6 +573,27 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { ...@@ -573,6 +573,27 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
return qStreamRecoverSetAllStepFinished(exec); return qStreamRecoverSetAllStepFinished(exec);
} }
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
ASSERT(latestVer >= pRange->maxVer);
int64_t nextStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask);
qDebug(
"s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan",
pTask->id.idStr);
} else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
// [pTask->dataRange.range.maxVer, ver1]
pRange->minVer = nextStartVer;
pRange->maxVer = latestVer - 1;
}
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
...@@ -627,7 +648,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) ...@@ -627,7 +648,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0; return 0;
} }
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) { int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
...@@ -635,7 +656,7 @@ int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFi ...@@ -635,7 +656,7 @@ int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFi
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) { int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
...@@ -652,12 +673,12 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { ...@@ -652,12 +673,12 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
// calculate the correct start time window, and start the handle the history data for the main task. // calculate the correct start time window, and start the handle the history data for the main task.
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
// check downstream tasks for associated scan-history-data tasks // check downstream tasks for associated scan-history-data tasks
streamCheckHistoryTaskDownstrem(pTask); streamCheckHistoryTaskDownstream(pTask);
// launch current task // launch current task
SHistDataRange* pRange = &pTask->dataRange; SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1; int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer; int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey; pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX; pRange->window.ekey = INT64_MAX;
...@@ -665,7 +686,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { ...@@ -665,7 +686,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
pRange->range.maxVer = ver; pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64, ", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer); pRange->range.maxVer);
} else { } else {
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "osMemory.h" #include "osMemory.h"
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInc.h" #include "streamInt.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcommon.h" #include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册