提交 59b8cfbc 编写于 作者: D dapan1121

enh: add sub_query in systable and kill subquery

上级 170182fe
...@@ -2488,6 +2488,7 @@ typedef struct { ...@@ -2488,6 +2488,7 @@ typedef struct {
int64_t stime; // timestamp precision ms int64_t stime; // timestamp precision ms
int64_t reqRid; int64_t reqRid;
bool stableQuery; bool stableQuery;
bool isSubQuery;
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
int32_t subPlanNum; int32_t subPlanNum;
SArray* subDesc; // SArray<SQuerySubDesc> SArray* subDesc; // SArray<SQuerySubDesc>
......
...@@ -256,6 +256,7 @@ typedef struct SRequestObj { ...@@ -256,6 +256,7 @@ typedef struct SRequestObj {
bool validateOnly; // todo refactor bool validateOnly; // todo refactor
bool killed; bool killed;
bool inRetry; bool inRetry;
bool isSubReq;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId; int64_t allocatorRefId;
...@@ -398,6 +399,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code); ...@@ -398,6 +399,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code);
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest); int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest);
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce); int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce);
void returnToUser(SRequestObj* pRequest); void returnToUser(SRequestObj* pRequest);
void stopAllQueries(SRequestObj *pRequest);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -363,6 +363,11 @@ void destroySubRequests(SRequestObj *pRequest) { ...@@ -363,6 +363,11 @@ void destroySubRequests(SRequestObj *pRequest) {
int32_t reqIdx = -1; int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL}; SRequestObj *pReqList[16] = {NULL};
uint64_t tmpRefId = 0; uint64_t tmpRefId = 0;
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
return;
}
SRequestObj* pTmp = pRequest; SRequestObj* pTmp = pRequest;
while (pTmp->relation.prevRefId) { while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId; tmpRefId = pTmp->relation.prevRefId;
...@@ -454,6 +459,63 @@ void destroyRequest(SRequestObj *pRequest) { ...@@ -454,6 +459,63 @@ void destroyRequest(SRequestObj *pRequest) {
removeRequest(pRequest->self); removeRequest(pRequest->self);
} }
void taosStopQueryImpl(SRequestObj *pRequest) {
pRequest->killed = true;
// It is not a query, no need to stop.
if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
tscDebug("request %" PRIx64 " killed", pRequest->requestId);
}
void stopAllQueries(SRequestObj *pRequest) {
int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL};
uint64_t tmpRefId = 0;
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
return;
}
SRequestObj* pTmp = pRequest;
while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
tmpRefId, pTmp->requestId);
break;
}
}
for (int32_t i = reqIdx; i >= 0; i--) {
taosStopQueryImpl(pReqList[i]);
}
taosStopQueryImpl(pRequest);
tmpRefId = pRequest->relation.nextRefId;
while (tmpRefId) {
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
tmpRefId = pTmp->relation.nextRefId;
taosStopQueryImpl(pTmp);
releaseRequest(pTmp->self);
} else {
tscError("0x%" PRIx64 " is not there", tmpRefId);
break;
}
}
}
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
static void *tscCrashReportThreadFp(void *param) { static void *tscCrashReportThreadFp(void *param) {
......
...@@ -464,6 +464,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { ...@@ -464,6 +464,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
desc.useconds = now - pRequest->metric.start; desc.useconds = now - pRequest->metric.start;
desc.reqRid = pRequest->self; desc.reqRid = pRequest->self;
desc.stableQuery = pRequest->stableQuery; desc.stableQuery = pRequest->stableQuery;
desc.isSubQuery = pRequest->isSubReq;
taosGetFqdn(desc.fqdn); taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.subplanNum; desc.subPlanNum = pRequest->body.subplanNum;
......
...@@ -243,6 +243,7 @@ int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj ...@@ -243,6 +243,7 @@ int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj
pRequest->relation.prevRefId = (*pNewRequest)->self; pRequest->relation.prevRefId = (*pNewRequest)->self;
(*pNewRequest)->relation.nextRefId = pRequest->self; (*pNewRequest)->relation.nextRefId = pRequest->self;
(*pNewRequest)->relation.userRefId = pRequest->self; (*pNewRequest)->relation.userRefId = pRequest->self;
(*pNewRequest)->isSubReq = true;
} }
return code; return code;
} }
......
...@@ -563,22 +563,13 @@ int taos_select_db(TAOS *taos, const char *db) { ...@@ -563,22 +563,13 @@ int taos_select_db(TAOS *taos, const char *db) {
return code; return code;
} }
void taos_stop_query(TAOS_RES *res) { void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
return; return;
} }
SRequestObj *pRequest = (SRequestObj *)res; stopAllQueries((SRequestObj*)res);
pRequest->killed = true;
// It is not a query, no need to stop.
if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
tscDebug("request %" PRIx64 " killed", pRequest->requestId);
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
...@@ -860,7 +851,7 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { ...@@ -860,7 +851,7 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) {
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) {
SRequestObj* pNewRequest = NULL; SRequestObj* pNewRequest = NULL;
SSqlCallbackWrapper* pNewWrapper = NULL; SSqlCallbackWrapper* pNewWrapper = NULL;
int32_t code = buildPreviousRequest(pWrapper->pRequest, "", &pNewRequest); int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest);
if (code) { if (code) {
handleQueryAnslyseRes(pWrapper, pResultMeta, code); handleQueryAnslyseRes(pWrapper, pResultMeta, code);
return; return;
......
...@@ -381,6 +381,7 @@ static const SSysDbTableSchema querySchema[] = { ...@@ -381,6 +381,7 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false}, {.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false},
{.name = "sub_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false},
{.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......
...@@ -224,6 +224,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR ...@@ -224,6 +224,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; if (tEncodeI64(pEncoder, desc->stime) < 0) return -1;
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1;
if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1; if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1;
if (tEncodeI8(pEncoder, desc->isSubQuery) < 0) return -1;
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1;
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1;
...@@ -291,6 +292,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) ...@@ -291,6 +292,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1;
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1; if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery) < 0) return -1;
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1;
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1;
......
...@@ -834,6 +834,9 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -834,6 +834,9 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->isSubQuery, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
......
...@@ -204,6 +204,8 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { ...@@ -204,6 +204,8 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
taosSsleep(5);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册