提交 1afd59a9 编写于 作者: H Haojun Liao

[TD-2165]<fix>: support the cancel query by a new message type.

上级 56b0529d
...@@ -396,7 +396,8 @@ int doProcessSql(SSqlObj *pSql) { ...@@ -396,7 +396,8 @@ int doProcessSql(SSqlObj *pSql) {
pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_CONNECT ||
pCmd->command == TSDB_SQL_HB || pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_META ||
pCmd->command == TSDB_SQL_STABLEVGROUP) { pCmd->command == TSDB_SQL_STABLEVGROUP||
pCmd->command == TSDB_SQL_CANCEL_QUERY) {
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL); pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
} }
...@@ -454,7 +455,6 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -454,7 +455,6 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pRetrieveMsg->free = htons(pQueryInfo->type);
// todo valid the vgroupId at the client side // todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -1394,6 +1394,43 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { ...@@ -1394,6 +1394,43 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload;
pCancelMsg->qhandle = htobe64(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
if (pTableMetaInfo->pVgroupTables == NULL) {
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex);
} else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex);
}
} else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId);
}
pSql->cmd.payloadLen = sizeof(SCancelQueryMsg);
pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
return TSDB_CODE_SUCCESS;
}
int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SAlterDbMsg); pCmd->payloadLen = sizeof(SAlterDbMsg);
...@@ -2396,6 +2433,7 @@ void tscInitMsgsFp() { ...@@ -2396,6 +2433,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg; tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg; tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg; tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_CANCEL_QUERY] = tscBuildCancelQueryMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg; tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
......
...@@ -605,7 +605,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { ...@@ -605,7 +605,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
cmd == TSDB_SQL_RETRIEVE || cmd == TSDB_SQL_RETRIEVE ||
cmd == TSDB_SQL_FETCH)) { cmd == TSDB_SQL_FETCH)) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_CANCEL_QUERY;
tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]); tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]);
tscProcessSql(pSql); tscProcessSql(pSql);
......
...@@ -36,7 +36,8 @@ enum { ...@@ -36,7 +36,8 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CANCEL_QUERY, "cancel-query" ) // send cancel msg to vnode to stop query
// the SQL below is for mgmt node // the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
......
...@@ -41,6 +41,7 @@ int32_t dnodeInitShell() { ...@@ -41,6 +41,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
......
...@@ -45,7 +45,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) ...@@ -45,7 +45,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CANCEL_QUERY, "cancel-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
...@@ -502,9 +502,13 @@ typedef struct { ...@@ -502,9 +502,13 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
uint64_t qhandle; uint64_t qhandle;
uint16_t free;
} SRetrieveTableMsg; } SRetrieveTableMsg;
typedef struct {
SMsgHead header;
uint64_t qhandle;
} SCancelQueryMsg;
typedef struct SRetrieveTableRsp { typedef struct SRetrieveTableRsp {
int32_t numOfRows; int32_t numOfRows;
int8_t completed; // all results are returned to client int8_t completed; // all results are returned to client
......
...@@ -30,11 +30,14 @@ ...@@ -30,11 +30,14 @@
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
void vnodeInitReadFp(void) { void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_CANCEL_QUERY] = vnodeProcessCancelMsg;
} }
// //
...@@ -117,7 +120,8 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt ...@@ -117,7 +120,8 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
pRead->qtype = qtype; pRead->qtype = qtype;
if (pRead->msgType == TSDB_MSG_TYPE_CM_KILL_QUERY) { if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_CANCEL_QUERY) {
pRead->msgType = TSDB_MSG_TYPE_CANCEL_QUERY;
return vnodeWriteIntoCQueue(pVnode, pRead); return vnodeWriteIntoCQueue(pVnode, pRead);
} else { } else {
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
...@@ -199,27 +203,27 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -199,27 +203,27 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
// qHandle needs to be freed correctly // qHandle needs to be freed correctly
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { assert(pRead->code != TSDB_CODE_RPC_NETWORK_UNAVAIL);
SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pRead->pCont; // if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
killQueryMsg->free = htons(killQueryMsg->free); // SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg *)pRead->pCont;
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); //// pCancelMsg->free = htons(killQueryMsg->free);
// pCancelMsg->qhandle = htobe64(pCancelMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pRead->rpcHandle); //
assert(pRead->contLen > 0 && killQueryMsg->free == 1); // vWarn("QInfo:%p connection %p broken, kill query", (void *)pCancelMsg->qhandle, pRead->rpcHandle);
//// assert(pRead->contLen > 0 && pCancelMsg->free == 1);
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle); //
if (qhandle == NULL || *qhandle == NULL) { // void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)pCancelMsg->qhandle);
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle, // if (qhandle == NULL || *qhandle == NULL) {
pRead->rpcHandle); // vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)pCancelMsg->qhandle, pRead->rpcHandle);
} else { // } else {
assert(*qhandle == (void *)killQueryMsg->qhandle); // assert(*qhandle == (void *)pCancelMsg->qhandle);
//
qKillQuery(*qhandle); // qKillQuery(*qhandle);
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); // qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
} // }
//
return TSDB_CODE_TSC_QUERY_CANCELLED; // return TSDB_CODE_TSC_QUERY_CANCELLED;
} // }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void ** handle = NULL; void ** handle = NULL;
...@@ -341,20 +345,21 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -341,20 +345,21 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle); vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet); vnodeBuildNoResultQueryRsp(pRet);
return code; return code;
} }
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
vnodeBuildNoResultQueryRsp(pRet); assert(pRetrieve->free != 1);
code = TSDB_CODE_TSC_QUERY_CANCELLED; // if (pRetrieve->free == 1) {
return code; // vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
} // qKillQuery(*handle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
//
// vnodeBuildNoResultQueryRsp(pRet);
// code = TSDB_CODE_TSC_QUERY_CANCELLED;
// return code;
// }
// register the qhandle to connect to quit query immediate if connection is broken // register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
...@@ -404,12 +409,48 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -404,12 +409,48 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// notify connection(handle) that current qhandle is created, if current connection from // notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately. // client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); SCancelQueryMsg *pCancelMsg = rpcMallocCont(sizeof(SCancelQueryMsg));
killQueryMsg->qhandle = htobe64((uint64_t)qhandle); pCancelMsg->qhandle = htobe64((uint64_t)qhandle);
killQueryMsg->free = htons(1); pCancelMsg->header.vgId = htonl(vgId);
killQueryMsg->header.vgId = htonl(vgId); pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg));
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg)); return rpcReportProgress(handle, (char *)pCancelMsg, sizeof(SRetrieveTableMsg));
}
int32_t vnodeProcessCancelMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
void *pCont = pRead->pCont;
SRspRet *pRet = &pRead->rspRet;
SCancelQueryMsg *pCancel = pCont;
pCancel->qhandle = htobe64(pCancel->qhandle);
vDebug("vgId:%d, QInfo:%p, cancel query msg is disposed, conn:%p", pVnode->vgId, (void *)pCancel->qhandle,
pRead->rpcHandle);
memset(pRet, 0, sizeof(SRspRet));
terrno = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
void ** handle = qAcquireQInfo(pVnode->qMgmt, pCancel->qhandle);
if (handle == NULL) {
code = terrno;
terrno = TSDB_CODE_SUCCESS;
} else if ((*handle) != (void *)pCancel->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
}
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in cancel query, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pCancel->qhandle);
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
vWarn("vgId:%d, QInfo:%p, cancel-query msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
vnodeBuildNoResultQueryRsp(pRet);
code = TSDB_CODE_TSC_QUERY_CANCELLED;
return code;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册