diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 00acc4741d78af3bebece1c04ed791e02b2e9fe9..e48f128f06d9ccbc3501d7843410895b1cfed305 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -36,6 +36,10 @@ typedef struct SReadHandle { void* vnode; void* mnd; SMsgCb* pMsgCb; + + /* XXXXXXXXXXXXXXXXXXXX */ + int32_t deleteQuery; + /* XXXXXXXXXXXXXXXXXXXX */ // int8_t initTsdbReader; } SReadHandle; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 501b259c2804ab0c483e964004a40fe948b9a836..b947c560c661149216bd1b548f2bc01f2e340f0c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -151,6 +151,10 @@ typedef struct SExecTaskInfo { jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] struct SOperatorInfo* pRoot; + + /* XXXXXXXXXXXXXXXXXXXX */ + SReadHandle* pHandle; + /* XXXXXXXXXXXXXXXXXXXX */ } SExecTaskInfo; enum { diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e4c0959185bb0e52de092164a49f9715d28c80c6..91488eceb7bb4dee83fde713944c420c233869fa 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -138,8 +138,27 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); - int64_t st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + int64_t st = taosGetTimestampUs(); + /* XXXXXXXXXXXXXXXXXXXX */ + if (pTaskInfo->pHandle->deleteQuery) { + static int32_t first = 1; + if (first) { + *pRes = createDataBlock(); + int64_t rows = 33; + SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, 8, 1); + blockDataAppendColInfo(*pRes, &infoData); + SColumnInfoData* pCol1 = taosArrayGet((*pRes)->pDataBlock, 0); + colDataAppend(pCol1, 0, (char*)&rows, false); + first = 0; + } else { + *pRes = NULL; + } + } else { + /* XXXXXXXXXXXXXXXXXXXX */ + *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + /* XXXXXXXXXXXXXXXXXXXX */ + } + /* XXXXXXXXXXXXXXXXXXXX */ uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a5a157ceae71c79363e81da7ab4d247c3db23a1f..0d84d67d7e452a1df06528af5187db11dc6be7c6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4638,6 +4638,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->user); + + /* XXXXXXXXXXXXXXXXXXXX */ + (*pTaskInfo)->pHandle = pHandle; + /* XXXXXXXXXXXXXXXXXXXX */ if (NULL == (*pTaskInfo)->pRoot) { code = (*pTaskInfo)->code; goto _complete; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index eb40b00d145758114c7afc690726d6e0f2e0c31d..402752a86ccb7dc4b27d9de886db884ce5168230 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -936,6 +936,11 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { ctx.plan = plan; + /* XXXXXXXXXXXXXXXXXXXX */ + SReadHandle *phandle = (SReadHandle*)qwMsg->node; + phandle->deleteQuery = 1; + /* XXXXXXXXXXXXXXXXXXXX */ + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));