From bb4f8515bd0236273223817657be5c8db9888f4d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Aug 2023 11:53:17 +0800 Subject: [PATCH] fix(stream): pause when outputQ is blocked. --- source/libs/stream/inc/streamInt.h | 3 +++ source/libs/stream/src/stream.c | 11 +++++------ source/libs/stream/src/streamExec.c | 16 +++++++++++----- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7a557a744a..ffd0eedea1 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,6 +26,9 @@ extern "C" { #endif +#define ONE_MB_F (1048576.0) +#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F) + typedef struct { int8_t inited; void* timer; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 03a0f3586d..5b6238330d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -18,9 +18,8 @@ #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define ONE_MB_F (1048576.0) -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) SStreamGlobalEnv streamEnv; int32_t streamInit() { @@ -178,7 +177,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -// todo add log int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t code = 0; int32_t type = pTask->outputInfo.type; @@ -191,11 +189,12 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } else { ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); - if (code != 0) { // todo failed to add it into the output queue, free it. - return code; + if (code != 0) { + qError("s-task:%s failed to put res into outputQ", pTask->id.idStr); } streamDispatchStreamBlock(pTask); + return code; } return 0; @@ -359,7 +358,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - msgLen, ver, total, size + msgLen/1048576.0); + msgLen, ver, total, size + SIZE_IN_MB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3ab6643802..d1827993a5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,7 +18,7 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MIN_STREAM_EXEC_BATCH_NUM 4 -#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 +#define STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo(SStreamTask* pTask); static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); @@ -51,7 +51,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* } qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, - size / 1048576.0); + SIZE_IN_MB(size)); code = streamTaskOutputResultBlock(pTask, pStreamBlocks); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position @@ -137,10 +137,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i taosArrayPush(pRes, &block); qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, - pTask->info.selfChildId, numOfBlocks, size / 1048576.0); + pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size)); // current output should be dispatched to down stream nodes - if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) { + if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) { ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); if (code != TSDB_CODE_SUCCESS) { @@ -570,6 +570,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } + if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + taosMsleep(1000); + continue; + } + // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); @@ -636,7 +642,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { double el = (taosGetTimestampMs() - st) / 1000.0; qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", - id, el, resSize / 1048576.0, totalBlocks); + id, el, SIZE_IN_MB(resSize), totalBlocks); streamFreeQitem(pInput); } -- GitLab