提交 76bf0aea 编写于 作者: L liuyao

opt stream input queue

上级 74ab6897
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
#include "streamInc.h" #include "streamInc.h"
#define STREAM_EXEC_MAX_BATCH_NUM 10240 #define MAX_STREAM_EXEC_BATCH_NUM 10240
#define MIN_STREAM_EXEC_BATCH_NUM 16
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
...@@ -260,12 +261,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -260,12 +261,18 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t batchSize = 1; int32_t batchSize = 1;
void* pInput = NULL; void* pInput = NULL;
int16_t times = 0;
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr); if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
times++;
taosMsleep(1);
qDebug("===stream===try agian batchSize:%d", batchSize);
continue;
}
break; break;
} }
...@@ -284,7 +291,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -284,7 +291,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
batchSize++; batchSize++;
pInput = newRet; pInput = newRet;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) { if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
break; break;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册