From 1afd59a9a21b2da4ed5c3da8b34f6c6a547de869 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 5 Dec 2020 23:10:49 +0800 Subject: [PATCH] [TD-2165]: support the cancel query by a new message type. --- src/client/src/tscServer.c | 42 ++++++++++++- src/client/src/tscSql.c | 2 +- src/common/inc/tcmdtype.h | 3 +- src/dnode/src/dnodeShell.c | 1 + src/inc/taosmsg.h | 8 ++- src/vnode/src/vnodeRead.c | 117 +++++++++++++++++++++++++------------ 6 files changed, 129 insertions(+), 44 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 59bcdd691d..40af86d4a2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -396,7 +396,8 @@ int doProcessSql(SSqlObj *pSql) { pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB || pCmd->command == TSDB_SQL_META || - pCmd->command == TSDB_SQL_STABLEVGROUP) { + pCmd->command == TSDB_SQL_STABLEVGROUP|| + pCmd->command == TSDB_SQL_CANCEL_QUERY) { pRes->code = tscBuildMsg[pCmd->command](pSql, NULL); } @@ -454,7 +455,6 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - pRetrieveMsg->free = htons(pQueryInfo->type); // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -1394,6 +1394,43 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } +int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { + SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload; + pCancelMsg->qhandle = htobe64(pSql->res.qhandle); + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + int32_t vgIndex = pTableMetaInfo->vgroupIndex; + if (pTableMetaInfo->pVgroupTables == NULL) { + SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; + assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); + + pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); + tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); + } else { + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + assert(vgIndex >= 0 && vgIndex < numOfVgroups); + + SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); + + pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); + tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); + } + } else { + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); + tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); + } + + pSql->cmd.payloadLen = sizeof(SCancelQueryMsg); + pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY; + + pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); + return TSDB_CODE_SUCCESS; +} + int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SAlterDbMsg); @@ -2396,6 +2433,7 @@ void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; + tscBuildMsg[TSDB_SQL_CANCEL_QUERY] = tscBuildCancelQueryMsg; tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index fae5b5856f..4286aed4e9 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -605,7 +605,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { cmd == TSDB_SQL_RETRIEVE || cmd == TSDB_SQL_FETCH)) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_CANCEL_QUERY; tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]); tscProcessSql(pSql); diff --git a/src/common/inc/tcmdtype.h b/src/common/inc/tcmdtype.h index 69bbccd67e..473af6bcca 100644 --- a/src/common/inc/tcmdtype.h +++ b/src/common/inc/tcmdtype.h @@ -36,7 +36,8 @@ enum { TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) - + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CANCEL_QUERY, "cancel-query" ) // send cancel msg to vnode to stop query + // the SQL below is for mgmt node TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 89f657f789..d65e5b69a2 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -41,6 +41,7 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; // the following message shall be treated as mnode write diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e8e3029244..bfbe535dc5 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -45,7 +45,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CANCEL_QUERY, "cancel-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) @@ -502,9 +502,13 @@ typedef struct { typedef struct { SMsgHead header; uint64_t qhandle; - uint16_t free; } SRetrieveTableMsg; +typedef struct { + SMsgHead header; + uint64_t qhandle; +} SCancelQueryMsg; + typedef struct SRetrieveTableRsp { int32_t numOfRows; int8_t completed; // all results are returned to client diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index ec5ba8f352..af67e24b90 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -30,11 +30,14 @@ static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); +static int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead); + static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = vnodeProcessCancelMsg; } // @@ -117,7 +120,8 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt pRead->qtype = qtype; - if (pRead->msgType == TSDB_MSG_TYPE_CM_KILL_QUERY) { + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_CANCEL_QUERY) { + pRead->msgType = TSDB_MSG_TYPE_CANCEL_QUERY; return vnodeWriteIntoCQueue(pVnode, pRead); } else { atomic_add_fetch_32(&pVnode->refCount, 1); @@ -199,27 +203,27 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly - if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pRead->pCont; - killQueryMsg->free = htons(killQueryMsg->free); - killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - - vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pRead->rpcHandle); - assert(pRead->contLen > 0 && killQueryMsg->free == 1); - - void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle); - if (qhandle == NULL || *qhandle == NULL) { - vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle, - pRead->rpcHandle); - } else { - assert(*qhandle == (void *)killQueryMsg->qhandle); - - qKillQuery(*qhandle); - qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); - } - - return TSDB_CODE_TSC_QUERY_CANCELLED; - } + assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL); +// if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { +// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg *)pRead->pCont; +//// pCancelMsg->free = htons(killQueryMsg->free); +// pCancelMsg->qhandle = htobe64(pCancelMsg->qhandle); +// +// vWarn("QInfo:%p connection %p broken, kill query", (void *)pCancelMsg->qhandle, pRead->rpcHandle); +//// assert(pRead->contLen > 0 && pCancelMsg->free == 1); +// +// void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pCancelMsg->qhandle); +// if (qhandle == NULL || *qhandle == NULL) { +// vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pCancelMsg->qhandle, pRead->rpcHandle); +// } else { +// assert(*qhandle == (void *)pCancelMsg->qhandle); +// +// qKillQuery(*qhandle); +// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); +// } +// +// return TSDB_CODE_TSC_QUERY_CANCELLED; +// } int32_t code = TSDB_CODE_SUCCESS; void ** handle = NULL; @@ -341,20 +345,21 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle); + vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle); vnodeBuildNoResultQueryRsp(pRet); return code; } - - if (pRetrieve->free == 1) { - vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); - qKillQuery(*handle); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); - vnodeBuildNoResultQueryRsp(pRet); - code = TSDB_CODE_TSC_QUERY_CANCELLED; - return code; - } + assert(pRetrieve->free != 1); +// if (pRetrieve->free == 1) { +// vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); +// qKillQuery(*handle); +// qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); +// +// vnodeBuildNoResultQueryRsp(pRet); +// code = TSDB_CODE_TSC_QUERY_CANCELLED; +// return code; +// } // register the qhandle to connect to quit query immediate if connection is broken if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { @@ -404,12 +409,48 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // notify connection(handle) that current qhandle is created, if current connection from // client is broken, the query needs to be killed immediately. int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { - SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); - killQueryMsg->qhandle = htobe64((uint64_t)qhandle); - killQueryMsg->free = htons(1); - killQueryMsg->header.vgId = htonl(vgId); - killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); + SCancelQueryMsg *pCancelMsg = rpcMallocCont(sizeof(SCancelQueryMsg)); + pCancelMsg->qhandle = htobe64((uint64_t)qhandle); + pCancelMsg->header.vgId = htonl(vgId); + pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); - return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg)); + return rpcReportProgress(handle, (char *)pCancelMsg, sizeof(SRetrieveTableMsg)); +} + +int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { + void *pCont = pRead->pCont; + SRspRet *pRet = &pRead->rspRet; + + SCancelQueryMsg *pCancel = pCont; + pCancel->qhandle = htobe64(pCancel->qhandle); + + vDebug("vgId:%d, QInfo:%p, cancel query msg is disposed, conn:%p", pVnode->vgId, (void *)pCancel->qhandle, + pRead->rpcHandle); + + memset(pRet, 0, sizeof(SRspRet)); + + terrno = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; + void ** handle = qAcquireQInfo(pVnode->qMgmt, pCancel->qhandle); + if (handle == NULL) { + code = terrno; + terrno = TSDB_CODE_SUCCESS; + } else if ((*handle) != (void *)pCancel->qhandle) { + code = TSDB_CODE_QRY_INVALID_QHANDLE; + } + + if (code != TSDB_CODE_SUCCESS) { + vError("vgId:%d, invalid handle in cancel query, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pCancel->qhandle); + vnodeBuildNoResultQueryRsp(pRet); + return code; + } + + vWarn("vgId:%d, QInfo:%p, cancel-query msg received to kill query and free qhandle", pVnode->vgId, *handle); + qKillQuery(*handle); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); + + vnodeBuildNoResultQueryRsp(pRet); + code = TSDB_CODE_TSC_QUERY_CANCELLED; + return code; } -- GitLab