提交 2011914d 编写于 作者: S Shengliang Guan

serialze kill req

上级 246311d4
......@@ -964,10 +964,16 @@ typedef struct {
int32_t queryId;
} SKillQueryReq;
int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
int32_t tDeserializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
typedef struct {
int32_t connId;
} SKillConnReq;
int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
typedef struct {
char user[TSDB_USER_LEN];
char spi;
......@@ -1365,7 +1371,6 @@ typedef struct {
int8_t precision;
int8_t compressed;
int32_t compLen;
int32_t numOfRows;
char data[];
} SVShowTablesFetchRsp;
......
......@@ -2128,6 +2128,58 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->connId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->queryId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->connId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->queryId) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->connId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->connId) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
\ No newline at end of file
......@@ -14,13 +14,13 @@
*/
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "mndProfile.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndUser.h"
#include "tglobal.h"
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
......@@ -276,7 +276,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
return TSDB_CODE_SUCCESS;
}
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
#if 0
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
if (pRsp == NULL) {
......@@ -350,14 +350,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
return -1;
}
SArray *pArray = batchReq.reqs;
int32_t sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
int32_t sz = taosArrayGetSize(batchReq.reqs);
for (int i = 0; i < sz; i++) {
SClientHbReq *pHbReq = taosArrayGet(pArray, i);
SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) {
......@@ -409,7 +408,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
}
}
}
taosArrayDestroyEx(pArray, tFreeClientHbReq);
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
void *buf = rpcMallocCont(tlen);
......@@ -517,19 +516,22 @@ static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
}
mndReleaseUser(pMnode, pUser);
SKillQueryReq *pKill = pReq->rpcMsg.pCont;
int32_t connId = htonl(pKill->connId);
int32_t queryId = htonl(pKill->queryId);
mInfo("kill query msg is received, queryId:%d", pKill->queryId);
SKillQueryReq killReq = {0};
if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
mInfo("kill query msg is received, queryId:%d", killReq.queryId);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%d, failed to kill queryId:%d, conn not exist", connId, queryId);
mError("connId:%d, failed to kill queryId:%d, conn not exist", killReq.connId, killReq.queryId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pReq->user);
pConn->queryId = queryId;
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user);
pConn->queryId = killReq.queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return 0;
}
......@@ -548,16 +550,19 @@ static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
}
mndReleaseUser(pMnode, pUser);
SKillConnReq *pKill = pReq->rpcMsg.pCont;
int32_t connId = htonl(pKill->connId);
SKillConnReq killReq = {0};
if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &killReq.connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%d, failed to kill connection, conn not exist", connId);
mError("connId:%d, failed to kill connection, conn not exist", killReq.connId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%d, is killed by user:%s", connId, pReq->user);
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->user);
pConn->killed = 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册