未验证 提交 edd4a787 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #22408 from taosdata/fix/3.0/TS-3810

fix: make kill query work for sysscanoperator
...@@ -99,7 +99,7 @@ struct SExecTaskInfo { ...@@ -99,7 +99,7 @@ struct SExecTaskInfo {
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI); SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(void* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
......
...@@ -191,7 +191,8 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols); ...@@ -191,7 +191,8 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols);
bool tsortIsClosed(SSortHandle* pHandle); bool tsortIsClosed(SSortHandle* pHandle);
void tsortSetClosed(SSortHandle* pHandle); void tsortSetClosed(SSortHandle* pHandle);
void setSingleTableMerge(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle);
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -59,7 +59,7 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP ...@@ -59,7 +59,7 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP
return pTaskInfo; return pTaskInfo;
} }
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code); } bool isTaskKilled(void* pTaskInfo) { return (0 != ((SExecTaskInfo*)pTaskInfo)->code); }
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) {
pTaskInfo->code = rspCode; pTaskInfo->code = rspCode;
......
...@@ -2928,8 +2928,9 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2928,8 +2928,9 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
} }
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
...@@ -2949,7 +2950,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2949,7 +2950,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (numOfTable == 1) { if (numOfTable == 1) {
setSingleTableMerge(pInfo->pSortHandle); tsortSetSingleTableMerge(pInfo->pSortHandle);
} else { } else {
code = tsortOpen(pInfo->pSortHandle); code = tsortOpen(pInfo->pSortHandle);
} }
......
...@@ -1601,6 +1601,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1601,6 +1601,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
char dbName[TSDB_DB_NAME_LEN] = {0}; char dbName[TSDB_DB_NAME_LEN] = {0};
if (isTaskKilled(pOperator->pTaskInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
const char* name = tNameGetTableName(&pInfo->name); const char* name = tNameGetTableName(&pInfo->name);
......
...@@ -71,12 +71,20 @@ struct SSortHandle { ...@@ -71,12 +71,20 @@ struct SSortHandle {
SMultiwayMergeTreeInfo* pMergeTree; SMultiwayMergeTreeInfo* pMergeTree;
bool singleTableMerge; bool singleTableMerge;
bool (*abortCheckFn)(void* param);
void* abortCheckParam;
}; };
void setSingleTableMerge(SSortHandle* pHandle) { void tsortSetSingleTableMerge(SSortHandle* pHandle) {
pHandle->singleTableMerge = true; pHandle->singleTableMerge = true;
} }
void tsortSetAbortCheckFn(SSortHandle *pHandle, bool (*checkFn)(void *), void* param) {
pHandle->abortCheckFn = checkFn;
pHandle->abortCheckParam = param;
}
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param); static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
// | offset[0] | offset[1] |....| nullbitmap | data |...| // | offset[0] | offset[1] |....| nullbitmap | data |...|
...@@ -726,11 +734,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -726,11 +734,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) { while (1) {
if (tsortIsClosed(pHandle)) { if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED; code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
return code; return code;
} }
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册