From e51f8e5f51e53033beb2ff90fe5b76551107f3cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Dec 2020 23:06:24 +0800 Subject: [PATCH] [TD-2165]: fix the bug that query can not be stopped if all query threads are busy. --- src/client/src/tscServer.c | 80 +++++++++++++++---------------- src/client/src/tscSql.c | 2 +- src/client/src/tscSubquery.c | 31 +++++++++++- src/common/inc/tcmdtype.h | 1 - src/dnode/src/dnodeShell.c | 1 - src/dnode/src/dnodeVRead.c | 8 ++-- src/inc/taosmsg.h | 7 +-- src/query/inc/qExecutor.h | 6 +-- src/query/src/qExecutor.c | 71 +++++++++++++++++++-------- src/vnode/src/vnodeRead.c | 93 +++++++++++------------------------- 10 files changed, 156 insertions(+), 144 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ed761a92f1..960f2561e2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -280,6 +280,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } int32_t cmd = pCmd->command; + // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { pSql->cmd.submitSchema = 1; @@ -395,8 +396,7 @@ int doProcessSql(SSqlObj *pSql) { pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB || pCmd->command == TSDB_SQL_META || - pCmd->command == TSDB_SQL_STABLEVGROUP|| - pCmd->command == TSDB_SQL_CANCEL_QUERY) { + pCmd->command == TSDB_SQL_STABLEVGROUP) { pRes->code = tscBuildMsg[pCmd->command](pSql, NULL); } @@ -451,9 +451,10 @@ int tscProcessSql(SSqlObj *pSql) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; - pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + pRetrieveMsg->free = htons(pQueryInfo->type); + pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -1393,42 +1394,42 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload; - pCancelMsg->qhandle = htobe64(pSql->res.qhandle); - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - int32_t vgIndex = pTableMetaInfo->vgroupIndex; - if (pTableMetaInfo->pVgroupTables == NULL) { - SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; - assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); - - pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); - tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); - } else { - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); - assert(vgIndex >= 0 && vgIndex < numOfVgroups); - - SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); - - pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); - tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); - } - } else { - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); - tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); - } - - pSql->cmd.payloadLen = sizeof(SCancelQueryMsg); - pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY; - - pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); - return TSDB_CODE_SUCCESS; -} +//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { +// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload; +// pCancelMsg->qhandle = htobe64(pSql->res.qhandle); +// +// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); +// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); +// +// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { +// int32_t vgIndex = pTableMetaInfo->vgroupIndex; +// if (pTableMetaInfo->pVgroupTables == NULL) { +// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; +// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); +// +// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); +// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); +// } else { +// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); +// assert(vgIndex >= 0 && vgIndex < numOfVgroups); +// +// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); +// +// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); +// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); +// } +// } else { +// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; +// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); +// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); +// } +// +// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg); +// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY; +// +// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); +// return TSDB_CODE_SUCCESS; +//} int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; @@ -2432,7 +2433,6 @@ void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; - tscBuildMsg[TSDB_SQL_CANCEL_QUERY] = tscBuildCancelQueryMsg; tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index b501241a89..d7dec2f356 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -605,7 +605,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { cmd == TSDB_SQL_RETRIEVE || cmd == TSDB_SQL_FETCH)) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_CANCEL_QUERY; + pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]); tscProcessSql(pSql); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 59879d86d1..973f21c92b 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2149,6 +2149,29 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } +static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { + if (pParentObj->retry > pParentObj->maxRetry) { + tscError("%p max retry reached, abort the retry effort", pParentObj) + return false; + } + + for (int32_t i = 0; i < numOfSub; ++i) { + int32_t code = pParentObj->pSubs[i]->res.code; + if (code == TSDB_CODE_SUCCESS) { + continue; + } + + if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID && + code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && + code != TSDB_CODE_APP_NOT_READY) { + pParentObj->res.code = code; + return false; + } + } + + return true; +} + static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; @@ -2190,8 +2213,12 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; (*pParentObj->fp)(pParentObj->param, pParentObj, v); } else { - int32_t numOfFailed = 0; + if (!needRetryInsert(pParentObj, numOfSub)) { + tscQueueAsyncRes(pParentObj); + return; + } + int32_t numOfFailed = 0; for(int32_t i = 0; i < numOfSub; ++i) { SSqlObj* pSql = pParentObj->pSubs[i]; if (pSql->res.code != TSDB_CODE_SUCCESS) { @@ -2221,7 +2248,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) tscResetSqlCmdObj(&pParentObj->cmd, false); - tscDebug("%p re-parse sql to generate data", pParentObj); + tscDebug("%p re-parse sql to generate submit data, retry:%d", pParentObj, pParentObj->retry++); int32_t code = tsParseSql(pParentObj, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; diff --git a/src/common/inc/tcmdtype.h b/src/common/inc/tcmdtype.h index 473af6bcca..bec8590536 100644 --- a/src/common/inc/tcmdtype.h +++ b/src/common/inc/tcmdtype.h @@ -36,7 +36,6 @@ enum { TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) - TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CANCEL_QUERY, "cancel-query" ) // send cancel msg to vnode to stop query // the SQL below is for mgmt node TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 6490992cae..d76af4e3dc 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -35,7 +35,6 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; // the following message shall be treated as mnode write diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 46a21c1240..0d4add2a5c 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -29,14 +29,14 @@ int32_t dnodeInitVRead() { tsVQueryWP.name = "vquery"; tsVQueryWP.workerFp = dnodeProcessReadQueue; tsVQueryWP.min = tsNumOfCores; - tsVQueryWP.max = tsNumOfCores * tsNumOfThreadsPerCore; - if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min; + tsVQueryWP.max = tsNumOfCores/* * tsNumOfThreadsPerCore*/; +// if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min; if (tWorkerInit(&tsVQueryWP) != 0) return -1; tsVFetchWP.name = "vfetch"; tsVFetchWP.workerFp = dnodeProcessReadQueue; - tsVFetchWP.min = 1; - tsVFetchWP.max = 1; + tsVFetchWP.min = MIN(4, tsNumOfCores); + tsVFetchWP.max = tsVFetchWP.min; if (tWorkerInit(&tsVFetchWP) != 0) return -1; return 0; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f2057c3094..72a4e4c234 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -45,7 +45,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CANCEL_QUERY, "cancel-query" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) @@ -502,11 +502,6 @@ typedef struct { uint16_t free; } SRetrieveTableMsg; -typedef struct { - SMsgHead header; - uint64_t qhandle; -} SCancelQueryMsg; - typedef struct SRetrieveTableRsp { int32_t numOfRows; int8_t completed; // all results are returned to client diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index f73ac246ca..9b29ad909a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -191,7 +191,7 @@ typedef struct SQueryRuntimeEnv { void* pQueryHandle; void* pSecQueryHandle; // another thread for bool stableQuery; // super table query or not - bool topBotQuery; // false + bool topBotQuery; // TODO used bitwise flag bool groupbyNormalCol; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation @@ -216,14 +216,13 @@ enum { typedef struct SQInfo { void* signature; int32_t code; // error code to returned to client - int64_t owner; // if it is in execution + int64_t owner; // if it is in execution void* tsdb; SMemRef memRef; int32_t vgId; STableGroupInfo tableGroupInfo; // table list SArray STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SQueryRuntimeEnv runtimeEnv; -// SArray* arrTableIdInfo; SHashObj* arrTableIdInfo; int32_t groupIndex; @@ -239,6 +238,7 @@ typedef struct SQInfo { tsem_t ready; int32_t dataReady; // denote if query result is ready or not void* rspContext; // response context + int64_t startExecTs; // start to exec timestamp } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 406e99f6ef..92de3fb84a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -128,11 +128,14 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) +#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) static void setQueryStatus(SQuery *pQuery, int8_t status); static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); -#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) +static int32_t getMaximumIdleDurationSec() { + return tsShellActivityTimer * 2; +} static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -2138,8 +2141,31 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); } +static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { + return pQInfo->rspContext != NULL; +} + #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) +static bool isQueryKilled(SQInfo *pQInfo) { + if (IS_QUERY_KILLED(pQInfo)) { + return true; + } + + // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived + // abort current query execution. + if (pQInfo->owner != 0 && ((taosGetTimestampSec() - pQInfo->startExecTs) > getMaximumIdleDurationSec()) && + (!needBuildResAfterQueryComplete(pQInfo))) { + + assert(pQInfo->startExecTs != 0); + qDebug("QInfo:%p retrieve not arrive beyond %d sec, abort current query execution, start:%"PRId64", current:%d", pQInfo, 1, + pQInfo->startExecTs, taosGetTimestampSec()); + return true; + } + + return false; +} + static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { @@ -2864,7 +2890,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; - if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -3432,7 +3458,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { int64_t startt = taosGetTimestampMs(); while (1) { - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); tfree(pTableList); @@ -4018,7 +4044,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { cond.twindow.skey, cond.twindow.ekey); // check if query is killed or not - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } } @@ -4675,7 +4701,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; while (tsdbNextDataBlock(pQueryHandle)) { - if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5112,7 +5138,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5491,7 +5517,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) { summary->totalBlocks += 1; - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5622,7 +5648,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5808,7 +5834,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el); // query error occurred or query is killed, abort current execution - if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5829,7 +5855,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_COMPLETED); - if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); //TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -5945,7 +5971,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->rec.rows = getNumOfResult(pRuntimeEnv); doSecondaryArithmeticProcess(pQuery); - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -7521,7 +7547,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { pthread_mutex_lock(&pQInfo->lock); pQInfo->dataReady = QUERY_RESULT_READY; - buildRes = (pQInfo->rspContext != NULL); + buildRes = needBuildResAfterQueryComplete(pQInfo); // clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is // put into task to be executed. @@ -7530,6 +7556,7 @@ static bool doBuildResCheck(SQInfo* pQInfo) { pthread_mutex_unlock(&pQInfo->lock); + // used in retrieve blocking model. tsem_post(&pQInfo->ready); return buildRes; } @@ -7546,7 +7573,9 @@ bool qTableQuery(qinfo_t qinfo) { return false; } - if (IS_QUERY_KILLED(pQInfo)) { + pQInfo->startExecTs = taosGetTimestampSec(); + + if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); return doBuildResCheck(pQInfo); } @@ -7578,7 +7607,7 @@ bool qTableQuery(qinfo_t qinfo) { } SQuery* pQuery = pRuntimeEnv->pQuery; - if (IS_QUERY_KILLED(pQInfo)) { + if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p query is killed", pQInfo); } else if (pQuery->rec.rows == 0) { qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); @@ -7607,6 +7636,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex int32_t code = TSDB_CODE_SUCCESS; if (tsHalfCoresForQuery) { + pQInfo->rspContext = pRspContext; tsem_wait(&pQInfo->ready); *buildRes = true; code = pQInfo->code; @@ -7614,12 +7644,12 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex SQuery *pQuery = pQInfo->runtimeEnv.pQuery; pthread_mutex_lock(&pQInfo->lock); - assert(pQInfo->rspContext == NULL); + assert(pQInfo->rspContext == NULL); if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; - qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%d", pQInfo, pQuery->rowSize, - pQuery->rec.rows, pQInfo->code); + qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->rowSize, + pQuery->rec.rows, tstrerror(pQInfo->code)); } else { *buildRes = false; qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); @@ -7697,7 +7727,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) { } SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - return IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER); + return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER); } int32_t qKillQuery(qinfo_t qinfo) { @@ -7994,8 +8024,6 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { return NULL; } - const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2 * 1000; - SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo); @@ -8011,7 +8039,8 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { return NULL; } else { TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo; - void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_QHANDLE_LIFE_SPAN); + void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), + (getMaximumIdleDurationSec()*1000)); // pthread_mutex_unlock(&pQueryMgmt->lock); return handle; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index c1caf291b4..03d1272771 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -24,15 +24,12 @@ static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); -static int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); int32_t vnodeInitRead(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; - vnodeProcessReadMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = vnodeProcessCancelMsg; - return 0; } @@ -120,8 +117,7 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1); - if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_CANCEL_QUERY || - pRead->msgType == TSDB_MSG_TYPE_FETCH) { + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); return taosWriteQitem(pVnode->fqueue, qtype, pRead); } else { @@ -202,20 +198,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly - assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL); + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + vError("error rpc msg in query, %s", tstrerror(pRead->code)); + } +// assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL); // if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { -// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg *)pRead->pCont; -//// pCancelMsg->free = htons(killQueryMsg->free); -// pCancelMsg->qhandle = htobe64(pCancelMsg->qhandle); +// SCancelQueryMsg *pMsg = (SCancelQueryMsg *)pRead->pCont; +//// pMsg->free = htons(killQueryMsg->free); +// pMsg->qhandle = htobe64(pMsg->qhandle); // -// vWarn("QInfo:%p connection %p broken, kill query", (void *)pCancelMsg->qhandle, pRead->rpcHandle); -//// assert(pRead->contLen > 0 && pCancelMsg->free == 1); +// vWarn("QInfo:%p connection %p broken, kill query", (void *)pMsg->qhandle, pRead->rpcHandle); +//// assert(pRead->contLen > 0 && pMsg->free == 1); // -// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pCancelMsg->qhandle); +// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pMsg->qhandle); // if (qhandle == NULL || *qhandle == NULL) { -// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pCancelMsg->qhandle, pRead->rpcHandle); +// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pMsg->qhandle, pRead->rpcHandle); // } else { -// assert(*qhandle == (void *)pCancelMsg->qhandle); +// assert(*qhandle == (void *)pMsg->qhandle); // // qKillQuery(*qhandle); // qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); @@ -349,16 +348,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { return code; } - assert(pRetrieve->free != 1); -// if (pRetrieve->free == 1) { -// vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); -// qKillQuery(*handle); -// qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); -// -// vnodeBuildNoResultQueryRsp(pRet); -// code = TSDB_CODE_TSC_QUERY_CANCELLED; -// return code; -// } + // kill current query and free corresponding resources. + if (pRetrieve->free == 1) { + vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); + qKillQuery(*handle); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); + + vnodeBuildNoResultQueryRsp(pRet); + code = TSDB_CODE_TSC_QUERY_CANCELLED; + return code; + } // register the qhandle to connect to quit query immediate if connection is broken if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { @@ -406,47 +405,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // notify connection(handle) that current qhandle is created, if current connection from // client is broken, the query needs to be killed immediately. int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { - SCancelQueryMsg *pCancelMsg = rpcMallocCont(sizeof(SCancelQueryMsg)); - pCancelMsg->qhandle = htobe64((uint64_t)qhandle); - pCancelMsg->header.vgId = htonl(vgId); - pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); + SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); + pMsg->qhandle = htobe64((uint64_t)qhandle); + pMsg->header.vgId = htonl(vgId); + pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); - return rpcReportProgress(handle, (char *)pCancelMsg, sizeof(SCancelQueryMsg)); -} - -int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { - void *pCont = pRead->pCont; - SRspRet *pRet = &pRead->rspRet; - - SCancelQueryMsg *pCancel = pCont; - pCancel->qhandle = htobe64(pCancel->qhandle); - - vDebug("vgId:%d, QInfo:%p, cancel query msg is disposed, conn:%p", pVnode->vgId, (void *)pCancel->qhandle, - pRead->rpcHandle); - - memset(pRet, 0, sizeof(SRspRet)); - - terrno = TSDB_CODE_SUCCESS; - int32_t code = TSDB_CODE_SUCCESS; - void ** handle = qAcquireQInfo(pVnode->qMgmt, pCancel->qhandle); - if (handle == NULL) { - code = terrno; - terrno = TSDB_CODE_SUCCESS; - } else if ((*handle) != (void *)pCancel->qhandle) { - code = TSDB_CODE_QRY_INVALID_QHANDLE; - } - - if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, invalid handle in cancel query, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pCancel->qhandle); - vnodeBuildNoResultQueryRsp(pRet); - return code; - } - - vWarn("vgId:%d, QInfo:%p, cancel-query msg received to kill query and free qhandle", pVnode->vgId, *handle); - qKillQuery(*handle); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); - - vnodeBuildNoResultQueryRsp(pRet); - return TSDB_CODE_TSC_QUERY_CANCELLED; + return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); } -- GitLab