diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 536cbed33e8353d704ef30096d2b2514d8e28632..f5dba354405855cdc5a308dce7db91d6ac9c63cb 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1306,6 +1306,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { colDataAssign(pDst, pSrc, src->info.rows, &src->info); } + uint32_t cap = dst->info.capacity; dst->info = src->info; dst->info.capacity = cap; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e70cf37c63260cd03ff0075a926c51de975d24c2..a2e269dd51a117b87b7587924a11e035f4818f82 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -153,14 +153,13 @@ typedef struct { SSchemaWrapper* qsw; } SSchemaInfo; -typedef struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; - +struct SExecTaskInfo { + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; int64_t version; // used for stream to record wal version SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; @@ -171,7 +170,8 @@ typedef struct SExecTaskInfo { SSubplan* pSubplan; struct SOperatorInfo* pRoot; SLocalFetch localFetch; -} SExecTaskInfo; + SArray* pResultBlockList;// result block list +}; enum { OP_NOT_OPENED = 0x0, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1aa9a3c61359cc3a0ad76df577c5ce3d6f061c8b..0163a389ef21e3ab8516a268bdb2701194cc2b89 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -536,7 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); } - taosArrayClearEx(pResList, freeBlock); + taosArrayClear(pResList); int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { @@ -574,8 +574,20 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo int64_t st = taosGetTimestampUs(); + int32_t blockIndex = 0; while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) { - SSDataBlock* p = createOneDataBlock(pRes, true); + SSDataBlock* p = NULL; + if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) { + SSDataBlock* p1 = createOneDataBlock(pRes, true); + taosArrayPush(pTaskInfo->pResultBlockList, &p1); + p = p1; + } else { + p = *(SSDataBlock**) taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); + copyDataBlock(p, pRes); + } + + blockIndex += 1; + current += p->info.rows; ASSERT(p->info.rows > 0); taosArrayPush(pResList, &p); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 709e981a1ff6bcb01a0f4765ec97eaf5c428c180..f16462268755d5e4936987f83d58d697bcdb5d92 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2600,6 +2600,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; pTaskInfo->pTableInfoList = tableListCreate(); + pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); @@ -3201,6 +3202,11 @@ _complete: return terrno; } +static void freeBlock(void* pParam) { + SSDataBlock* pBlock = *(SSDataBlock**)pParam; + blockDataDestroy(pBlock); +} + void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); @@ -3213,6 +3219,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); } + taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a7cd3db82432f9e148ba02bfcc4e5faa02f8b119..2606556838435936e0525afdd524cd53e04f0772 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -18,11 +18,6 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; -static void freeBlock(void *param) { - SSDataBlock *pBlock = *(SSDataBlock **)param; - blockDataDestroy(pBlock); -} - int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; @@ -193,7 +188,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } _return: - taosArrayDestroyEx(pResList, freeBlock); + taosArrayDestroy(pResList); QW_RET(code); }