From 2011914dd66552f3641c95d639bc27afb099a160 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 15:44:20 +0800 Subject: [PATCH] serialze kill req --- include/common/tmsg.h | 7 +++- source/common/src/tmsg.c | 52 ++++++++++++++++++++++++ source/dnode/mnode/impl/src/mndProfile.c | 47 +++++++++++---------- 3 files changed, 84 insertions(+), 22 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 015678f193..4cd36acefa 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -964,10 +964,16 @@ typedef struct { int32_t queryId; } SKillQueryReq; +int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq); +int32_t tDeserializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq); + typedef struct { int32_t connId; } SKillConnReq; +int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); +int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); + typedef struct { char user[TSDB_USER_LEN]; char spi; @@ -1365,7 +1371,6 @@ typedef struct { int8_t precision; int8_t compressed; int32_t compLen; - int32_t numOfRows; char data[]; } SVShowTablesFetchRsp; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d450b0a6ca..7b9682df14 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2128,6 +2128,58 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->connId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->queryId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->connId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->queryId) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->connId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->connId) < 0) return -1; + tEndDecode(&decoder); + tCoderClear(&decoder); return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 2b3bf6ca68..3504c6ced7 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -14,13 +14,13 @@ */ #define _DEFAULT_SOURCE -#include "tglobal.h" #include "mndProfile.h" #include "mndDb.h" -#include "mndStb.h" #include "mndMnode.h" #include "mndShow.h" +#include "mndStb.h" #include "mndUser.h" +#include "tglobal.h" #define QUERY_ID_SIZE 20 #define QUERY_OBJ_ID_SIZE 18 @@ -276,7 +276,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { return TSDB_CODE_SUCCESS; } -static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { +static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { #if 0 SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp)); if (pRsp == NULL) { @@ -350,14 +350,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { return -1; } - SArray *pArray = batchReq.reqs; - int32_t sz = taosArrayGetSize(pArray); - + SClientHbBatchRsp batchRsp = {0}; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); + int32_t sz = taosArrayGetSize(batchReq.reqs); for (int i = 0; i < sz; i++) { - SClientHbReq *pHbReq = taosArrayGet(pArray, i); + SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { int32_t kvNum = taosHashGetSize(pHbReq->info); if (NULL == pHbReq->info || kvNum <= 0) { @@ -409,7 +408,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } } } - taosArrayDestroyEx(pArray, tFreeClientHbReq); + taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp); void *buf = rpcMallocCont(tlen); @@ -517,19 +516,22 @@ static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) { } mndReleaseUser(pMnode, pUser); - SKillQueryReq *pKill = pReq->rpcMsg.pCont; - int32_t connId = htonl(pKill->connId); - int32_t queryId = htonl(pKill->queryId); - mInfo("kill query msg is received, queryId:%d", pKill->queryId); + SKillQueryReq killReq = {0}; + if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + mInfo("kill query msg is received, queryId:%d", killReq.queryId); - SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t)); if (pConn == NULL) { - mError("connId:%d, failed to kill queryId:%d, conn not exist", connId, queryId); + mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId); terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pReq->user); - pConn->queryId = queryId; + mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user); + pConn->queryId = killReq.queryId; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return 0; } @@ -548,16 +550,19 @@ static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) { } mndReleaseUser(pMnode, pUser); - SKillConnReq *pKill = pReq->rpcMsg.pCont; - int32_t connId = htonl(pKill->connId); + SKillConnReq killReq = {0}; + if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } - SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t)); if (pConn == NULL) { - mError("connId:%d, failed to kill connection, conn not exist", connId); + mError("connId:%d, failed to kill connection, conn not exist", killReq.connId); terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%d, is killed by user:%s", connId, pReq->user); + mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user); pConn->killed = 1; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return TSDB_CODE_SUCCESS; -- GitLab