diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 47684e71824fcdb0027e61209b5905a3da327019..6eee9df52823f38bfcd1bdbae69952edc58cd245 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2779,7 +2779,7 @@ _error: return NULL; } -static SSDataBlock* getTableDataBlockImpl(void* param) { +static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; STableMergeScanInfo* pInfo = pOperator->info; @@ -2797,6 +2797,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); } @@ -2805,8 +2806,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } if (isTaskKilled(pTaskInfo)) { + qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + break; } // process this data block based on the probabilities @@ -2819,6 +2821,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { + qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); } @@ -2909,7 +2912,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); } - tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); // one table has one data block int32_t numOfTable = tableEndIdx - tableStartIdx + 1; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7784bc0c94f765a356faeac0d360a3d327e933f6..0a8d7ee376b9a92a7c4bc5fb7544de0f6bb77030 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1049,12 +1049,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } if (pBlk == NULL) { break; - }; + } + + if (tsortIsClosed(pHandle)) { + tSimpleHashClear(mUidBlk); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + break; + } } + tSimpleHashCleanup(mUidBlk); taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); - taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + if (!tsortIsClosed(pHandle)) { + taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + } taosArrayDestroy(aExtSrc); pHandle->type = SORT_SINGLESOURCE_SORT;