From 7a8ab98ff1aeefc0227857077192dc39c7554a9b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Jun 2020 10:01:11 +0000 Subject: [PATCH] [TD-464] show connections --- src/client/inc/tscProfile.h | 2 +- src/client/inc/tsclient.h | 1 + src/client/src/tscProfile.c | 5 +- src/client/src/tscSQLParser.c | 6 - src/client/src/tscServer.c | 18 +- src/dnode/src/dnodeMain.c | 10 +- src/inc/taoserror.h | 1 + src/inc/taosmsg.h | 6 + src/mnode/inc/mnodeProfile.h | 3 + src/mnode/inc/mnodeShow.h | 1 + src/mnode/src/mnodeMain.c | 106 +++++------ src/mnode/src/mnodeProfile.c | 337 +++++++++++++++++++--------------- src/mnode/src/mnodeShow.c | 35 +++- src/mnode/src/mnodeTable.c | 20 +- src/util/inc/tcache.h | 9 + src/util/src/tcache.c | 29 +++ 16 files changed, 334 insertions(+), 255 deletions(-) diff --git a/src/client/inc/tscProfile.h b/src/client/inc/tscProfile.h index 16b9efac38..e82b7242a8 100644 --- a/src/client/inc/tscProfile.h +++ b/src/client/inc/tscProfile.h @@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql); void tscRemoveFromSqlList(SSqlObj *pSql); void tscAddIntoStreamList(SSqlStream *pStream); void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj); -char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj); +int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj); void tscKillQuery(STscObj *pObj, uint32_t killId); void tscKillStream(STscObj *pObj, uint32_t killId); void tscKillConnection(STscObj *pObj); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3c1d43d688..935fc3804d 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -293,6 +293,7 @@ typedef struct STscObj { char sversion[TSDB_VERSION_LEN]; char writeAuth : 1; char superAuth : 1; + uint32_t connId; struct SSqlObj * pSql; struct SSqlObj * pHb; struct SSqlObj * sqlList; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 74294d38e0..2162408ee6 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -206,7 +206,8 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { taos_close_stream(pStream); } -char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { +int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { + char * pStart = pMsg; char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; SQqueryList *pQList = (SQqueryList *)pMsg; @@ -270,7 +271,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { /* pthread_mutex_unlock (&pObj->mutex); */ - return pMsg; + return pMsg - pStart; } void tscKillConnection(STscObj *pObj) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 965e85efbd..7b078210bf 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2250,12 +2250,6 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { char* ipStr = strtok(ip->z, &delim); char* portStr = strtok(NULL, &delim); - if (!validateIpAddress(ipStr, strlen(ipStr))) { - memset(pCmd->payload, 0, strlen(pCmd->payload)); - - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); - } - uint16_t port = (uint16_t)strtol(portStr, NULL, 10); if (port <= 0 || port > 65535) { memset(pCmd->payload, 0, strlen(pCmd->payload)); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8ca590a1f6..5fd58178b1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -114,6 +114,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pIpList->numOfIps > 0) tscSetMgmtIpList(pIpList); + pSql->pTscObj->connId = htonl(pRsp->connId); + if (pRsp->killConnection) { tscKillConnection(pObj); } else { @@ -1769,30 +1771,25 @@ int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { } int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - char *pMsg, *pStart; - int msgLen = 0; - int size = 0; - SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; pthread_mutex_lock(&pObj->mutex); - size = tscEstimateHeartBeatMsgLength(pSql); + int size = tscEstimateHeartBeatMsgLength(pSql); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { pthread_mutex_unlock(&pObj->mutex); tscError("%p failed to malloc for heartbeat msg", pSql); return -1; } - pMsg = pCmd->payload; - pStart = pMsg; + SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg*)pCmd->payload; + pHeartbeat->connId = htonl(pSql->pTscObj->connId); - pMsg = tscBuildQueryStreamDesc(pMsg, pObj); + int msgLen = tscBuildQueryStreamDesc((char*)pHeartbeat + sizeof(pHeartbeat->connId), pObj); pthread_mutex_unlock(&pObj->mutex); - msgLen = pMsg - pStart; - pCmd->payloadLen = msgLen; + pCmd->payloadLen = msgLen + sizeof(pHeartbeat->connId); pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT; assert(msgLen + minMsgSize() <= size); @@ -2206,6 +2203,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; + pObj->connId = htonl(pConnect->connId); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 6940081629..119d390556 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -45,7 +45,7 @@ typedef struct { void (*cleanup)(); } SDnodeComponent; -static const SDnodeComponent SDnodeComponents[] = { +static const SDnodeComponent tsDnodeComponents[] = { {"storage", dnodeInitStorage, dnodeCleanupStorage}, {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, @@ -61,14 +61,14 @@ static const SDnodeComponent SDnodeComponents[] = { static void dnodeCleanupComponents(int32_t stepId) { for (int32_t i = stepId; i >= 0; i--) { - SDnodeComponents[i].cleanup(); + tsDnodeComponents[i].cleanup(); } } static int32_t dnodeInitComponents() { int32_t code = 0; - for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) { - if (SDnodeComponents[i].init() != 0) { + for (int32_t i = 0; i < sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]); i++) { + if (tsDnodeComponents[i].init() != 0) { dnodeCleanupComponents(i); code = -1; break; @@ -122,7 +122,7 @@ int32_t dnodeInitSystem() { void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); - dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1); + dnodeCleanupComponents(sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]) - 1); taos_cleanup(); taosCloseLog(); } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index db910c9cae..60b1b68e77 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -151,6 +151,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 0x0405, "no disk perm TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 0x0406, "file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 0x0407, "memory corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUCH_FILE_OR_DIR, 0, 0x0408, "no such file or directory") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_SHELL_CONNS, 0, 0x0409, "too many shell conns") // client TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 0x0481, "invalid client version") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a11f34dd14..9c14a7dc54 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -137,6 +137,7 @@ enum _mgmt_table { TSDB_MGMT_TABLE_SCORES, TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_VNODES, + TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_MAX, }; @@ -287,6 +288,9 @@ typedef struct { char serverVersion[TSDB_VERSION_LEN]; int8_t writeAuth; int8_t superAuth; + int8_t reserved1; + int8_t reserved2; + int32_t connId; SRpcIpSet ipList; } SCMConnectRsp; @@ -712,6 +716,7 @@ typedef struct { } SStreamList; typedef struct { + uint32_t connId; SQqueryList qlist; SStreamList slist; } SCMHeartBeatMsg; @@ -721,6 +726,7 @@ typedef struct { uint32_t streamId; uint32_t totalDnodes; uint32_t onlineDnodes; + uint32_t connId; int8_t killConnection; SRpcIpSet ipList; } SCMHeartBeatRsp; diff --git a/src/mnode/inc/mnodeProfile.h b/src/mnode/inc/mnodeProfile.h index 0311983a38..29a808a189 100644 --- a/src/mnode/inc/mnodeProfile.h +++ b/src/mnode/inc/mnodeProfile.h @@ -24,6 +24,9 @@ extern "C" { int32_t mnodeInitProfile(); void mnodeCleanupProfile(); +uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port); +bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mnodeShow.h b/src/mnode/inc/mnodeShow.h index d571eabfd8..da66e71678 100644 --- a/src/mnode/inc/mnodeShow.h +++ b/src/mnode/inc/mnodeShow.h @@ -28,6 +28,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); +void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index baecfac077..298d10993b 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -33,14 +33,54 @@ #include "mnodeUser.h" #include "mnodeTable.h" #include "mnodeShow.h" +#include "mnodeProfile.h" + +typedef struct { + const char *const name; + int (*init)(); + void (*cleanup)(); +} SMnodeComponent; void *tsMnodeTmr; static bool tsMgmtIsRunning = false; +static const SMnodeComponent tsMnodeComponents[] = { + {"profile", mnodeInitProfile, mnodeCleanupProfile}, + {"accts", mnodeInitAccts, mnodeCleanupAccts}, + {"users", mnodeInitUsers, mnodeCleanupUsers}, + {"dnodes", mnodeInitDnodes, mnodeCleanupDnodes}, + {"dbs", mnodeInitDbs, mnodeCleanupDbs}, + {"vgroups", mnodeInitVgroups, mnodeCleanupVgroups}, + {"tables", mnodeInitTables, mnodeCleanupTables}, + {"mnodes", mnodeInitMnodes, mnodeCleanupMnodes}, + {"sdb", sdbInit, sdbCleanUp}, + {"balance", balanceInit, balanceCleanUp}, + {"grant", grantInit, grantCleanUp}, + {"show", mnodeInitShow, mnodeCleanUpShow} +}; + static void mnodeInitTimer(); static void mnodeCleanupTimer(); static bool mnodeNeedStart() ; +static void mnodeCleanupComponents(int32_t stepId) { + for (int32_t i = stepId; i >= 0; i--) { + tsMnodeComponents[i].cleanup(); + } +} + +static int32_t mnodeInitComponents() { + int32_t code = 0; + for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) { + if (tsMnodeComponents[i].init() != 0) { + mnodeCleanupComponents(i); + code = -1; + break; + } + } + return code; +} + int32_t mnodeStartSystem() { if (tsMgmtIsRunning) { mPrint("mnode module already started..."); @@ -57,57 +97,7 @@ int32_t mnodeStartSystem() { dnodeAllocateMnodeRqueue(); dnodeAllocateMnodePqueue(); - if (mnodeInitAccts() < 0) { - mError("failed to init accts"); - return -1; - } - - if (mnodeInitUsers() < 0) { - mError("failed to init users"); - return -1; - } - - if (mnodeInitDnodes() < 0) { - mError("failed to init dnodes"); - return -1; - } - - if (mnodeInitDbs() < 0) { - mError("failed to init dbs"); - return -1; - } - - if (mnodeInitVgroups() < 0) { - mError("failed to init vgroups"); - return -1; - } - - if (mnodeInitTables() < 0) { - mError("failed to init tables"); - return -1; - } - - if (mnodeInitMnodes() < 0) { - mError("failed to init mnodes"); - return -1; - } - - if (sdbInit() < 0) { - mError("failed to init sdb"); - return -1; - } - - if (balanceInit() < 0) { - mError("failed to init balance") - } - - if (grantInit() < 0) { - mError("failed to init grant"); - return -1; - } - - if (mnodeInitShow() < 0) { - mError("failed to init show"); + if (mnodeInitComponents() != 0) { return -1; } @@ -115,7 +105,6 @@ int32_t mnodeStartSystem() { tsMgmtIsRunning = true; mPrint("mnode is initialized successfully"); - return 0; } @@ -133,17 +122,8 @@ void mnodeCleanupSystem() { dnodeFreeMnodeRqueue(); dnodeFreeMnodePqueue(); mnodeCleanupTimer(); - mnodeCleanUpShow(); - grantCleanUp(); - balanceCleanUp(); - sdbCleanUp(); - mnodeCleanupMnodes(); - mnodeCleanupTables(); - mnodeCleanupVgroups(); - mnodeCleanupDbs(); - mnodeCleanupDnodes(); - mnodeCleanupUsers(); - mnodeCleanupAccts(); + mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1); + mPrint("mnode is cleaned up"); } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index a37f5436c6..4623c87f66 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -18,6 +18,10 @@ #include "taosmsg.h" #include "taoserror.h" #include "tutil.h" +#include "ttime.h" +#include "tcache.h" +#include "tglobal.h" +#include "tdataformat.h" #include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" @@ -32,23 +36,144 @@ #include "mnodeVgroup.h" #include "mnodeWrite.h" -int32_t mnodeSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); - -int32_t mnodeKillQuery(char *qidstr, void *pConn); -int32_t mnodeKillStream(char *qidstr, void *pConn); -int32_t mnodeKillConnection(char *qidstr, void *pConn); +#define CONN_KEEP_TIME (tsShellActivityTimer * 3) +#define CONN_CHECK_TIME (tsShellActivityTimer * 2) typedef struct { - char user[TSDB_TABLE_ID_LEN + 1]; - uint64_t stime; - uint32_t ip; + char user[TSDB_USER_LEN + 1]; + int8_t killed; uint16_t port; -} SConnInfo; + uint32_t ip; + uint32_t connId; + uint64_t stime; +} SConnObj; + +extern void *tsMnodeTmr; +static SCacheObj *tsMnodeConnCache = NULL; +static uint32_t tsConnIndex = 0; + +static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +static void mnodeFreeConn(void *data); +static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg); + +// static int32_t mnodeKillQuery(char *qidstr, void *pConn); +// static int32_t mnodeKillStream(char *qidstr, void *pConn); + +int32_t mnodeInitProfile() { + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); + + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg); + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); + mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); + + tsMnodeConnCache = taosCacheInitWithCb(tsMnodeTmr, CONN_CHECK_TIME, mnodeFreeConn); + return 0; +} + +void mnodeCleanupProfile() { + if (tsMnodeConnCache != NULL) { + mPrint("conn cache is cleanup"); + taosCacheCleanup(tsMnodeConnCache); + tsMnodeConnCache = NULL; + } +} + +uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { + int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); + if (connSize > tsMaxShellConns) { + mError("failed to create conn for user:%s ip:%s:%u, conns:%d larger than maxShellConns:%d, ", user, taosIpStr(ip), + port, connSize, tsMaxShellConns); + terrno = TSDB_CODE_TOO_MANY_SHELL_CONNS; + return 0; + } + + uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1); + if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1); + + SConnObj connObj = { + .ip = ip, + .port = port, + .connId = connId, + .stime = taosGetTimestampMs() + }; + + char key[10]; + sprintf(key, "%u", connId); + strcpy(connObj.user, user); + void *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME); + taosCacheRelease(tsMnodeConnCache, &pConn, false); + + mTrace("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); + return connId; +} + +bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { + char key[10]; + sprintf(key, "%u", connId); + uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); + + SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime); + if (pConn == NULL) { + mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); + return false; + } + + if (pConn->ip != ip || pConn->port != port /* || strcmp(pConn->user, user) != 0 */ ) { + mError("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user, + taosIpStr(ip), port, pConn->user, taosIpStr(pConn->ip), pConn->port); + taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); + return false; + } + + //mTrace("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port); + taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); + return true; +} + +static void mnodeFreeConn(void *data) { + SConnObj *pConn = data; + mTrace("connId:%d, is destroyed", pConn->connId); +} + +static void *mnodeGetNextConn(SHashMutableIterator *pIter, SConnObj **pConn) { + *pConn = NULL; + + if (pIter == NULL) { + pIter = taosHashCreateIter(tsMnodeConnCache->pHashTable); + } + + if (!taosHashIterNext(pIter)) { + taosHashDestroyIter(pIter); + return NULL; + } + + SCacheDataNode **pNode = taosHashIterGet(pIter); + if (pNode == NULL || *pNode == NULL) { + taosHashDestroyIter(pIter); + return NULL; + } + + *pConn = (SConnObj*)((*pNode)->data); + return pIter; +} typedef struct { int numOfConns; int index; - SConnInfo connInfo[]; + SConnObj connInfo[]; } SConnShow; typedef struct { @@ -400,100 +525,23 @@ int32_t mnodeKillStream(char *qidstr, void *pConn) { return TSDB_CODE_INVALID_STREAM_ID; } -int32_t mnodeKillConnection(char *qidstr, void *pConn) { -// void *pConn1 = NULL; -// char * temp, *chr, idstr[64]; -// strcpy(idstr, qidstr); -// -// temp = idstr; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint32_t ip = inet_addr(temp); -// -// temp = chr + 1; -// uint16_t port = htons(atoi(temp)); -// SAcctObj *pAcct = pConn->pAcct; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConn = pAcct->pConn; -// while (pConn) { -// if (pConn->ip == ip && pConn->port == port) { -// // there maybe two connections from a shell -// if (pConn1 == NULL) -// pConn1 = pConn; -// else -// break; -// } -// -// pConn = pConn->next; -// } -// -// if (pConn1) pConn1->killConnection = 1; -// if (pConn) pConn->killConnection = 1; -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// if (pConn1 == NULL) goto _error; -// -// mTrace("connection:%s is there, kill it", qidstr); -// return 0; -// -//_error: -// mTrace("connection:%s is not there", qidstr); - - return TSDB_CODE_INVALID_CONNECTION; -} - - -int mnodeGetConns(SShowObj *pShow, void *pConn) { - // SAcctObj * pAcct = pConn->pAcct; - // SConnShow *pConnShow; - // - // pthread_mutex_lock(&pAcct->mutex); - // - // pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); - // pConnShow->index = 0; - // pConnShow->numOfConns = 0; - // - // if (pAcct->acctInfo.numOfConns > 0) { - // pConn = pAcct->pConn; - // SConnInfo *pConnInfo = pConnShow->connInfo; - // - // while (pConn && pConn->pUser) { - // strcpy(pConnInfo->user, pConn->pUser->user); - // pConnInfo->ip = pConn->ip; - // pConnInfo->port = pConn->port; - // pConnInfo->stime = pConn->stime; - // - // pConnShow->numOfConns++; - // pConnInfo++; - // pConn = pConn->next; - // } - // } - // - // pthread_mutex_unlock(&pAcct->mutex); - // - // // sorting based on useconds - // - // pShow->pIter = pConnShow; - - return 0; -} - int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; SSchema *pSchema = pMeta->schema; + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "connId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "user"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); pSchema[cols].bytes = htons(pShow->bytes[cols]); @@ -509,54 +557,64 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } - pShow->numOfRows = 1000000; - pShow->pIter = NULL; + pShow->numOfRows = taosHashGetSize(tsMnodeConnCache->pHashTable); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - mnodeGetConns(pShow, pConn); return 0; } int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; - char *pWrite; + SConnObj *pConnObj = NULL; int32_t cols = 0; - - SConnShow *pConnShow = (SConnShow *)pShow->pIter; - - if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; + char * pWrite; + char ipStr[TSDB_IPv4ADDR_LEN + 7]; while (numOfRows < rows) { - SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; + pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); + if (pConnObj == NULL) break; + cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->user); + *(int32_t *) pWrite = pConnObj->connId; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pNode->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, TSDB_USER_LEN); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; + snprintf(ipStr, TSDB_IPv4ADDR_LEN + 6, "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, TSDB_IPv4ADDR_LEN + 6); cols++; - numOfRows++; - pConnShow->index++; - } + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pConnObj->stime; + cols++; - if (numOfRows == 0) { - tfree(pConnShow); + numOfRows++; } pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 4; + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + return numOfRows; } +static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + return 0; +} + +static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + return 0; +} + int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; @@ -608,41 +666,18 @@ int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { } int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { - // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - // SUserObj *pUser = mnodeGetUserFromConn(pMsg->thandle); - // if (pUser == NULL) { - // rpcRsp.code = TSDB_CODE_INVALID_USER; - // rpcSendResponse(&rpcRsp); - // return; - // } - - // SCMKillConnMsg *pKill = pMsg->pCont; - // int32_t code; - - // if (!pUser->writeAuth) { - // code = TSDB_CODE_NO_RIGHTS; - // } else { - // code = mgmtKillConnection(pKill->queryId, pMsg->thandle); - // } - - // rpcRsp.code = code; - // rpcSendResponse(&rpcRsp); - // mnodeDecUserRef(pUser); - return TSDB_CODE_SUCCESS; -} - -int32_t mnodeInitProfile() { - // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta); - // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries); - // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta); - // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns); - - // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg); - // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); - // mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); - - return 0; + SUserObj *pUser = pMsg->pUser; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; + SConnObj * pConn = taosCacheAcquireByName(tsMnodeConnCache, pKill->queryId); + if (pConn == NULL) { + mError("connId:%s, failed to kill, conn not exist", pKill->queryId); + return TSDB_CODE_INVALID_CONNECTION; + } else { + mError("connId:%s, is killed by user:%s", pKill->queryId, pUser->user); + pConn->killed = 1; + taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); + return TSDB_CODE_SUCCESS; + } } - -void mnodeCleanupProfile() {} diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index dd6de9d351..0a456a9095 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -227,6 +227,23 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } + SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont; + SRpcConnInfo connInfo; + rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo); + + int32_t connId = htonl(pHBMsg->connId); + if (!mnodeCheckConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort)) { + connId = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + if (connId == 0) { +#if 0 + // do not close existing links, otherwise + mError("failed to create connId, close connect"); + pHBRsp->killConnection = 1; +#endif + } + } + + pHBRsp->connId = htonl(connId); pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); @@ -235,9 +252,9 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { * TODO * Dispose kill stream or kill query message */ - pHBRsp->queryId = 0; - pHBRsp->streamId = 0; - pHBRsp->killConnection = 0; + // pHBRsp->queryId = 0; + // pHBRsp->streamId = 0; + // pHBRsp->killConnection = 0; pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); @@ -281,10 +298,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { goto connect_over; } + int32_t connId = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + if (connId == 0) code = terrno; + sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); strcpy(pConnectRsp->serverVersion, version); pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; + pConnectRsp->connId = htonl(connId); mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); @@ -358,3 +379,11 @@ static void mnodeReleaseShowObj(void *pShow, bool forceRemove) { mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false"); taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove); } + +void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { + if (rows < capacity) { + for (int32_t i = 0; i < numOfCols; ++i) { + memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows); + } + } +} \ No newline at end of file diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 84a1f659d5..31bceacbea 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -61,8 +61,8 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); @@ -568,8 +568,8 @@ int32_t mnodeInitTables() { mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables); - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeGetStreamTableMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeRetrieveStreamTables); return TSDB_CODE_SUCCESS; } @@ -2108,14 +2108,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void return 0; } -static void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { - if (rows < capacity) { - for (int32_t i = 0; i < numOfCols; ++i) { - memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows); - } - } -} - static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) return 0; @@ -2259,7 +2251,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { return code; } -static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; @@ -2305,7 +2297,7 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p return 0; } -static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { SDbObj *pDb = mnodeGetDb(pShow->db); if (pDb == NULL) return 0; diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 6dd707b763..3f9fe07517 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -111,6 +111,15 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz */ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key); +/** + * update the expire time of data in cache + * @param pCacheObj cache object + * @param key key + * @param expireTime new expire time of data + * @return + */ +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime); + /** * Add one reference count for the exist data, and assign this data for a new owner. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 2641d2eacb..7174f94f4a 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -487,6 +487,35 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { return (ptNode != NULL) ? (*ptNode)->data : NULL; } +void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime) { + if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { + return NULL; + } + + uint32_t keyLen = (uint32_t)strlen(key); + + __cache_rd_lock(pCacheObj); + + SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); + if (ptNode != NULL) { + T_REF_INC(*ptNode); + (*ptNode)->expiredTime = expireTime; + } + + __cache_unlock(pCacheObj); + + if (ptNode != NULL) { + atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); + uTrace("key:%s expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); + } else { + atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); + uTrace("key:%s not in cache, retrieved failed", key); + } + + atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); + return (ptNode != NULL) ? (*ptNode)->data : NULL; +} + void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { if (pCacheObj == NULL || data == NULL) return NULL; -- GitLab