diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 86308984efadc4d60e4534401f9f353efd32fc0a..5d9161b5a3b19c8acf52ac0bcba3046574808bee 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2488,6 +2488,7 @@ typedef struct { int64_t stime; // timestamp precision ms int64_t reqRid; bool stableQuery; + bool isSubQuery; char fqdn[TSDB_FQDN_LEN]; int32_t subPlanNum; SArray* subDesc; // SArray diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index afa19c674e585471f1ef930d0ffcb9edea0c5ce8..fa444779f301ac7919f8114545f8f1c36ec82a90 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -256,6 +256,7 @@ typedef struct SRequestObj { bool validateOnly; // todo refactor bool killed; bool inRetry; + bool isSubReq; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; int64_t allocatorRefId; @@ -398,6 +399,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code); int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest); int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce); void returnToUser(SRequestObj* pRequest); +void stopAllQueries(SRequestObj *pRequest); #ifdef __cplusplus } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 4f883ca1f4022f90b8643e03a1918d364724660b..c64bbfbdb676ad6a4654da068aa1b747fa33327e 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -363,6 +363,11 @@ void destroySubRequests(SRequestObj *pRequest) { int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; uint64_t tmpRefId = 0; + + if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { + return; + } + SRequestObj* pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; @@ -454,6 +459,63 @@ void destroyRequest(SRequestObj *pRequest) { removeRequest(pRequest->self); } +void taosStopQueryImpl(SRequestObj *pRequest) { + pRequest->killed = true; + + // It is not a query, no need to stop. + if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) { + tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId); + return; + } + + schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED); + tscDebug("request %" PRIx64 " killed", pRequest->requestId); +} + +void stopAllQueries(SRequestObj *pRequest) { + int32_t reqIdx = -1; + SRequestObj *pReqList[16] = {NULL}; + uint64_t tmpRefId = 0; + + if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { + return; + } + + SRequestObj* pTmp = pRequest; + while (pTmp->relation.prevRefId) { + tmpRefId = pTmp->relation.prevRefId; + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + pReqList[++reqIdx] = pTmp; + releaseRequest(tmpRefId); + } else { + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, + tmpRefId, pTmp->requestId); + break; + } + } + + for (int32_t i = reqIdx; i >= 0; i--) { + taosStopQueryImpl(pReqList[i]); + } + + taosStopQueryImpl(pRequest); + + tmpRefId = pRequest->relation.nextRefId; + while (tmpRefId) { + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + tmpRefId = pTmp->relation.nextRefId; + taosStopQueryImpl(pTmp); + releaseRequest(pTmp->self); + } else { + tscError("0x%" PRIx64 " is not there", tmpRefId); + break; + } + } +} + + void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } static void *tscCrashReportThreadFp(void *param) { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 2dddfec2bd0e1b73090a5c4532119e94da92b429..cbfa48b322fb0e0e8e2f6f8d58532ed68947a01e 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -464,6 +464,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { desc.useconds = now - pRequest->metric.start; desc.reqRid = pRequest->self; desc.stableQuery = pRequest->stableQuery; + desc.isSubQuery = pRequest->isSubReq; taosGetFqdn(desc.fqdn); desc.subPlanNum = pRequest->body.subplanNum; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index de142660999a30f6d934ff8b75d1bf7160d9a2e9..2a73156e8a03ffc4e46d159383f8bee1defd30cb 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -243,6 +243,7 @@ int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj pRequest->relation.prevRefId = (*pNewRequest)->self; (*pNewRequest)->relation.nextRefId = pRequest->self; (*pNewRequest)->relation.userRefId = pRequest->self; + (*pNewRequest)->isSubReq = true; } return code; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ca4acf3edf01c9f6b53fb00f28eb1540267434ef..7573fd59684900f797ee79c5b37a83087aae308a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -563,22 +563,13 @@ int taos_select_db(TAOS *taos, const char *db) { return code; } + void taos_stop_query(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return; } - SRequestObj *pRequest = (SRequestObj *)res; - pRequest->killed = true; - - // It is not a query, no need to stop. - if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) { - tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId); - return; - } - - schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED); - tscDebug("request %" PRIx64 " killed", pRequest->requestId); + stopAllQueries((SRequestObj*)res); } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { @@ -860,7 +851,7 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { SRequestObj* pNewRequest = NULL; SSqlCallbackWrapper* pNewWrapper = NULL; - int32_t code = buildPreviousRequest(pWrapper->pRequest, "", &pNewRequest); + int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); if (code) { handleQueryAnslyseRes(pWrapper, pResultMeta, code); return; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index db0cc78de682a948126fd476f4129406c50740b8..f751a9aa9b40a04bc14845fb1cdffa48f7b31fb2 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -381,6 +381,7 @@ static const SSysDbTableSchema querySchema[] = { {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false}, + {.name = "sub_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false}, {.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cd874dad507ed0ba0e41eeb8aa35e5cd0eef3dea..c7db62cdec0cd8377a68376c9742ca54dae5e98f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -224,6 +224,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1; + if (tEncodeI8(pEncoder, desc->isSubQuery) < 0) return -1; if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; @@ -291,6 +292,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery) < 0) return -1; if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index a1d815189c335215a3a812a50dfb5c6d96d64a84..460e75b42235d243f85996facef5919dd14290e9 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -834,6 +834,9 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->isSubQuery, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 94041140d4e60030a4b93a1d6a6b7b4df9eacd85..b2d1790fafce8128dac3a8cb54aa018663cbae4f 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -204,6 +204,8 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + taosSsleep(5); + if (pOperator->status == OP_EXEC_DONE) { return NULL; }