未验证 提交 fad68716 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19255 from taosdata/enh/stream_drop_when_scan

enh: optimize drop when scan exec
...@@ -1112,6 +1112,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1112,6 +1112,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step 1 // do recovery step 1
streamSourceRecoverScanStep1(pTask); streamSourceRecoverScanStep1(pTask);
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// build msg to launch next step // build msg to launch next step
SStreamRecoverStep2Req req; SStreamRecoverStep2Req req;
code = streamBuildSourceRecover2Req(pTask, &req); code = streamBuildSourceRecover2Req(pTask, &req);
...@@ -1122,6 +1127,10 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1122,6 +1127,10 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
return 0;
}
// serialize msg // serialize msg
int32_t len = sizeof(SStreamRecoverStep1Req); int32_t len = sizeof(SStreamRecoverStep1Req);
...@@ -1160,6 +1169,11 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -1160,6 +1169,11 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
return -1; return -1;
} }
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// restore param // restore param
code = streamRestoreParam(pTask); code = streamRestoreParam(pTask);
if (code < 0) { if (code < 0) {
......
...@@ -110,10 +110,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -110,10 +110,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
return 0;
}
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(0); return -1;
} }
if (output == NULL) { if (output == NULL) {
if (qStreamRecoverScanFinished(exec)) { if (qStreamRecoverScanFinished(exec)) {
......
...@@ -200,7 +200,6 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* ...@@ -200,7 +200,6 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
if (qStreamSourceRecoverStep2(exec, ver) < 0) { if (qStreamSourceRecoverStep2(exec, ver) < 0) {
ASSERT(0);
} }
return streamScanExec(pTask, 100); return streamScanExec(pTask, 100);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册