提交 f84bfc96 编写于 作者: H Haojun Liao

fix(stream): fix error in pause in stream.

上级 5d4efe11
...@@ -325,7 +325,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -325,7 +325,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
while (1) { while (1) {
int32_t batchSize = 1; int32_t batchSize = 0;
int16_t times = 0; int16_t times = 0;
SStreamQueueItem* pInput = NULL; SStreamQueueItem* pInput = NULL;
...@@ -335,8 +335,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -335,8 +335,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%s", pTask->id.idStr, batchSize); qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, batchSize);
if (batchSize > 1) { if (batchSize > 0) {
break; break;
} else { } else {
return 0; return 0;
...@@ -357,6 +357,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -357,6 +357,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
if (pInput == NULL) { if (pInput == NULL) {
batchSize += 1;
pInput = qItem; pInput = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->taskLevel == TASK_LEVEL__SINK) { if (pTask->taskLevel == TASK_LEVEL__SINK) {
...@@ -364,18 +366,20 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -364,18 +366,20 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
} else { } else {
// todo we need to sort the data block, instead of just appending into the array list. // todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL; ASSERT(batchSize >= 1);
if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
void* newRet = streamMergeQueueItem(pInput, qItem);
if (newRet == NULL) {
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
break; break;
} else { } else {
batchSize++; batchSize += 1;
pInput = newRet; pInput = newRet;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { if (batchSize >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
MAX_STREAM_EXEC_BATCH_NUM);
break; break;
} }
} }
...@@ -390,6 +394,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -390,6 +394,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// } // }
if (pInput == NULL) { if (pInput == NULL) {
ASSERT(batchSize == 0);
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册