diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 716b939e5f58cbc0b1c3b86069facf723af67d6e..1a786077709b84b2c12739c947f823b822efe0a3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,11 +16,11 @@ #include "streamInc.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 8 -#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 8 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 -static int32_t updateCheckPointInfo (SStreamTask* pTask); +static int32_t updateCheckPointInfo(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -48,10 +48,11 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return -1; } - qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, + size / 1048576.0); code = streamTaskOutputResultBlock(pTask, pStreamBlocks); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position destroyStreamDataBlock(pStreamBlocks); return -1; } @@ -65,7 +66,8 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return TSDB_CODE_SUCCESS; } -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, + int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -82,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } if (streamTaskShouldStop(&pTask->status)) { - taosArrayDestroy(pRes); // memory leak + taosArrayDestroy(pRes); // memory leak return 0; } @@ -101,7 +103,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SSDataBlock block = {0}; - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem; + const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); @@ -153,7 +155,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); } else { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } return code; @@ -286,7 +288,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif -int32_t updateCheckPointInfo (SStreamTask* pTask) { +int32_t updateCheckPointInfo(SStreamTask* pTask) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); @@ -294,7 +296,8 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { SCheckpointInfo* pCkInfo = &pTask->chkInfo; if (ckId > pCkInfo->id) { // save it since the checkpoint is updated qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 - ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); + ", checkPoint id:%" PRId64 " -> %" PRId64, + pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; @@ -417,14 +420,15 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, - pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, + pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; SArray* pBlockList = pBlock->blocks; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, + pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; @@ -445,8 +449,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t totalBlocks = 0; streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); - double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, + resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); }