提交 ebc8c4d8 编写于 作者: S Shengliang Guan

[TD-464] support kill connection

上级 7a8ab98f
...@@ -93,7 +93,7 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo ...@@ -93,7 +93,7 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo
static int32_t validateDNodeConfig(tDCLSQL* pOptions); static int32_t validateDNodeConfig(tDCLSQL* pOptions);
static int32_t validateLocalConfig(tDCLSQL* pOptions); static int32_t validateLocalConfig(tDCLSQL* pOptions);
static int32_t validateColumnName(char* name); static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
...@@ -532,7 +532,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -532,7 +532,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_KILL_QUERY: case TSDB_SQL_KILL_QUERY:
case TSDB_SQL_KILL_STREAM: case TSDB_SQL_KILL_STREAM:
case TSDB_SQL_KILL_CONNECTION: { case TSDB_SQL_KILL_CONNECTION: {
if ((code = setKillInfo(pSql, pInfo)) != TSDB_CODE_SUCCESS) { if ((code = setKillInfo(pSql, pInfo, pInfo->type)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2231,31 +2231,46 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -2231,31 +2231,46 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
const char* msg1 = "invalid ip address"; const char* msg1 = "invalid connection ID";
const char* msg2 = "invalid port"; const char* msg2 = "invalid query ID";
const char* msg3 = "invalid stream ID";
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type; pCmd->command = pInfo->type;
SSQLToken* ip = &(pInfo->pDCLInfo->ip); SSQLToken* idStr = &(pInfo->pDCLInfo->ip);
if (ip->n > TSDB_KILL_MSG_LEN) { if (idStr->n > TSDB_KILL_MSG_LEN) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
strncpy(pCmd->payload, ip->z, ip->n); strncpy(pCmd->payload, idStr->z, idStr->n);
const char delim = ':'; const char delim = ':';
char* connIdStr = strtok(idStr->z, &delim);
char* queryIdStr = strtok(NULL, &delim);
char* ipStr = strtok(ip->z, &delim); int32_t connId = (int32_t)strtol(connIdStr, NULL, 10);
char* portStr = strtok(NULL, &delim); if (connId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload));
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
uint16_t port = (uint16_t)strtol(portStr, NULL, 10); if (killType == TSDB_SQL_KILL_CONNECTION) {
if (port <= 0 || port > 65535) { strncpy(pCmd->payload, idStr->z, idStr->n);
return TSDB_CODE_SUCCESS;
}
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
if (queryId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload)); memset(pCmd->payload, 0, strlen(pCmd->payload));
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); if (killType == TSDB_SQL_KILL_QUERY) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} else {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
} }
strncpy(pCmd->payload, idStr->z, idStr->n);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -148,8 +148,8 @@ ...@@ -148,8 +148,8 @@
#define TK_SET 130 #define TK_SET 130
#define TK_KILL 131 #define TK_KILL 131
#define TK_CONNECTION 132 #define TK_CONNECTION 132
#define TK_COLON 133 #define TK_STREAM 133
#define TK_STREAM 134 #define TK_COLON 134
#define TK_ABORT 135 #define TK_ABORT 135
#define TK_AFTER 136 #define TK_AFTER 136
#define TK_ATTACH 137 #define TK_ATTACH 137
......
...@@ -21,11 +21,22 @@ extern "C" { ...@@ -21,11 +21,22 @@ extern "C" {
#endif #endif
#include "mnodeDef.h" #include "mnodeDef.h"
typedef struct {
char user[TSDB_USER_LEN + 1];
int8_t killed;
uint16_t port;
uint32_t ip;
uint32_t connId;
uint64_t stime;
uint64_t lastAccess;
} SConnObj;
int32_t mnodeInitProfile(); int32_t mnodeInitProfile();
void mnodeCleanupProfile(); void mnodeCleanupProfile();
uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port); SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port); SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -39,15 +39,6 @@ ...@@ -39,15 +39,6 @@
#define CONN_KEEP_TIME (tsShellActivityTimer * 3) #define CONN_KEEP_TIME (tsShellActivityTimer * 3)
#define CONN_CHECK_TIME (tsShellActivityTimer * 2) #define CONN_CHECK_TIME (tsShellActivityTimer * 2)
typedef struct {
char user[TSDB_USER_LEN + 1];
int8_t killed;
uint16_t port;
uint32_t ip;
uint32_t connId;
uint64_t stime;
} SConnObj;
extern void *tsMnodeTmr; extern void *tsMnodeTmr;
static SCacheObj *tsMnodeConnCache = NULL; static SCacheObj *tsMnodeConnCache = NULL;
static uint32_t tsConnIndex = 0; static uint32_t tsConnIndex = 0;
...@@ -91,13 +82,13 @@ void mnodeCleanupProfile() { ...@@ -91,13 +82,13 @@ void mnodeCleanupProfile() {
} }
} }
uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable);
if (connSize > tsMaxShellConns) { if (connSize > tsMaxShellConns) {
mError("failed to create conn for user:%s ip:%s:%u, conns:%d larger than maxShellConns:%d, ", user, taosIpStr(ip), mError("failed to create conn for user:%s ip:%s:%u, conns:%d larger than maxShellConns:%d, ", user, taosIpStr(ip),
port, connSize, tsMaxShellConns); port, connSize, tsMaxShellConns);
terrno = TSDB_CODE_TOO_MANY_SHELL_CONNS; terrno = TSDB_CODE_TOO_MANY_SHELL_CONNS;
return 0; return NULL;
} }
uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1); uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
...@@ -109,18 +100,22 @@ uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { ...@@ -109,18 +100,22 @@ uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
.connId = connId, .connId = connId,
.stime = taosGetTimestampMs() .stime = taosGetTimestampMs()
}; };
char key[10];
sprintf(key, "%u", connId);
strcpy(connObj.user, user); strcpy(connObj.user, user);
void *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME);
taosCacheRelease(tsMnodeConnCache, &pConn, false); char key[10];
sprintf(key, "%u", connId);
SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME);
mTrace("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mTrace("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return connId; return pConn;
}
void mnodeReleaseConn(SConnObj *pConn) {
if(pConn == NULL) return;
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false);
} }
bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
char key[10]; char key[10];
sprintf(key, "%u", connId); sprintf(key, "%u", connId);
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
...@@ -128,19 +123,19 @@ bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { ...@@ -128,19 +123,19 @@ bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime); SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return false; return NULL;
} }
if (pConn->ip != ip || pConn->port != port /* || strcmp(pConn->user, user) != 0 */ ) { if (pConn->ip != ip || pConn->port != port /* || strcmp(pConn->user, user) != 0 */) {
mError("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user, mError("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user,
taosIpStr(ip), port, pConn->user, taosIpStr(pConn->ip), pConn->port); taosIpStr(ip), port, pConn->user, taosIpStr(pConn->ip), pConn->port);
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
return false; return NULL;
} }
//mTrace("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port); // mTrace("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port);
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); pConn->lastAccess = expireTime;
return true; return pConn;
} }
static void mnodeFreeConn(void *data) { static void mnodeFreeConn(void *data) {
...@@ -553,6 +548,12 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { ...@@ -553,6 +548,12 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "last access");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols); pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
...@@ -597,6 +598,10 @@ int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pCon ...@@ -597,6 +598,10 @@ int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pCon
*(int64_t *)pWrite = pConnObj->stime; *(int64_t *)pWrite = pConnObj->stime;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pConnObj->lastAccess;
cols++;
numOfRows++; numOfRows++;
} }
...@@ -675,7 +680,7 @@ int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { ...@@ -675,7 +680,7 @@ int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId); mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
return TSDB_CODE_INVALID_CONNECTION; return TSDB_CODE_INVALID_CONNECTION;
} else { } else {
mError("connId:%s, is killed by user:%s", pKill->queryId, pUser->user); mPrint("connId:%s, is killed by user:%s", pKill->queryId, pUser->user);
pConn->killed = 1; pConn->killed = 1;
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -232,22 +232,26 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -232,22 +232,26 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo); rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
int32_t connId = htonl(pHBMsg->connId); int32_t connId = htonl(pHBMsg->connId);
if (!mnodeCheckConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort)) { SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
connId = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); if (pConn == NULL) {
if (connId == 0) { pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
#if 0 }
// do not close existing links, otherwise
mError("failed to create connId, close connect"); if (pConn == NULL) {
// do not close existing links, otherwise
// mError("failed to create connId, close connect");
// pHBRsp->killConnection = 1;
} else {
pHBRsp->connId = htonl(pConn->connId);
if (pConn->killed != 0) {
pHBRsp->killConnection = 1; pHBRsp->killConnection = 1;
#endif }
}
} }
pHBRsp->connId = htonl(connId);
pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); mnodeGetMnodeIpSetForShell(&pHBRsp->ipList);
/* /*
* TODO * TODO
* Dispose kill stream or kill query message * Dispose kill stream or kill query message
...@@ -259,6 +263,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -259,6 +263,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.rsp = pHBRsp;
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
mnodeReleaseConn(pConn);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -298,15 +303,19 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -298,15 +303,19 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
goto connect_over; goto connect_over;
} }
int32_t connId = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (connId == 0) code = terrno; if (pConn == NULL) {
code = terrno;
} else {
pConnectRsp->connId = htonl(pConn->connId);
mnodeReleaseConn(pConn);
}
sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
strcpy(pConnectRsp->serverVersion, version); strcpy(pConnectRsp->serverVersion, version);
pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth; pConnectRsp->superAuth = pUser->superAuth;
pConnectRsp->connId = htonl(connId);
mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList);
connect_over: connect_over:
......
此差异已折叠。
...@@ -111,7 +111,7 @@ echo "serverPort ${NODE}" >> $TAOS_CFG ...@@ -111,7 +111,7 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 199" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册