提交 7a8ab98f 编写于 作者: S Shengliang Guan

[TD-464] show connections

上级 a96d4114
...@@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql); ...@@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql);
void tscRemoveFromSqlList(SSqlObj *pSql); void tscRemoveFromSqlList(SSqlObj *pSql);
void tscAddIntoStreamList(SSqlStream *pStream); void tscAddIntoStreamList(SSqlStream *pStream);
void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj); void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj);
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj); int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj);
void tscKillQuery(STscObj *pObj, uint32_t killId); void tscKillQuery(STscObj *pObj, uint32_t killId);
void tscKillStream(STscObj *pObj, uint32_t killId); void tscKillStream(STscObj *pObj, uint32_t killId);
void tscKillConnection(STscObj *pObj); void tscKillConnection(STscObj *pObj);
......
...@@ -293,6 +293,7 @@ typedef struct STscObj { ...@@ -293,6 +293,7 @@ typedef struct STscObj {
char sversion[TSDB_VERSION_LEN]; char sversion[TSDB_VERSION_LEN];
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
uint32_t connId;
struct SSqlObj * pSql; struct SSqlObj * pSql;
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
......
...@@ -206,7 +206,8 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { ...@@ -206,7 +206,8 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
taos_close_stream(pStream); taos_close_stream(pStream);
} }
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
char * pStart = pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SQqueryList *pQList = (SQqueryList *)pMsg; SQqueryList *pQList = (SQqueryList *)pMsg;
...@@ -270,7 +271,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { ...@@ -270,7 +271,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
/* pthread_mutex_unlock (&pObj->mutex); */ /* pthread_mutex_unlock (&pObj->mutex); */
return pMsg; return pMsg - pStart;
} }
void tscKillConnection(STscObj *pObj) { void tscKillConnection(STscObj *pObj) {
......
...@@ -2250,12 +2250,6 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -2250,12 +2250,6 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char* ipStr = strtok(ip->z, &delim); char* ipStr = strtok(ip->z, &delim);
char* portStr = strtok(NULL, &delim); char* portStr = strtok(NULL, &delim);
if (!validateIpAddress(ipStr, strlen(ipStr))) {
memset(pCmd->payload, 0, strlen(pCmd->payload));
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
uint16_t port = (uint16_t)strtol(portStr, NULL, 10); uint16_t port = (uint16_t)strtol(portStr, NULL, 10);
if (port <= 0 || port > 65535) { if (port <= 0 || port > 65535) {
memset(pCmd->payload, 0, strlen(pCmd->payload)); memset(pCmd->payload, 0, strlen(pCmd->payload));
......
...@@ -114,6 +114,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -114,6 +114,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pIpList->numOfIps > 0) if (pIpList->numOfIps > 0)
tscSetMgmtIpList(pIpList); tscSetMgmtIpList(pIpList);
pSql->pTscObj->connId = htonl(pRsp->connId);
if (pRsp->killConnection) { if (pRsp->killConnection) {
tscKillConnection(pObj); tscKillConnection(pObj);
} else { } else {
...@@ -1769,30 +1771,25 @@ int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { ...@@ -1769,30 +1771,25 @@ int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
} }
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart;
int msgLen = 0;
int size = 0;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
pthread_mutex_lock(&pObj->mutex); pthread_mutex_lock(&pObj->mutex);
size = tscEstimateHeartBeatMsgLength(pSql); int size = tscEstimateHeartBeatMsgLength(pSql);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
tscError("%p failed to malloc for heartbeat msg", pSql); tscError("%p failed to malloc for heartbeat msg", pSql);
return -1; return -1;
} }
pMsg = pCmd->payload; SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg*)pCmd->payload;
pStart = pMsg; pHeartbeat->connId = htonl(pSql->pTscObj->connId);
pMsg = tscBuildQueryStreamDesc(pMsg, pObj); int msgLen = tscBuildQueryStreamDesc((char*)pHeartbeat + sizeof(pHeartbeat->connId), pObj);
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
msgLen = pMsg - pStart; pCmd->payloadLen = msgLen + sizeof(pHeartbeat->connId);
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT; pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
...@@ -2206,6 +2203,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2206,6 +2203,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth; pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0; return 0;
......
...@@ -45,7 +45,7 @@ typedef struct { ...@@ -45,7 +45,7 @@ typedef struct {
void (*cleanup)(); void (*cleanup)();
} SDnodeComponent; } SDnodeComponent;
static const SDnodeComponent SDnodeComponents[] = { static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"storage", dnodeInitStorage, dnodeCleanupStorage},
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
...@@ -61,14 +61,14 @@ static const SDnodeComponent SDnodeComponents[] = { ...@@ -61,14 +61,14 @@ static const SDnodeComponent SDnodeComponents[] = {
static void dnodeCleanupComponents(int32_t stepId) { static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) { for (int32_t i = stepId; i >= 0; i--) {
SDnodeComponents[i].cleanup(); tsDnodeComponents[i].cleanup();
} }
} }
static int32_t dnodeInitComponents() { static int32_t dnodeInitComponents() {
int32_t code = 0; int32_t code = 0;
for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) { for (int32_t i = 0; i < sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]); i++) {
if (SDnodeComponents[i].init() != 0) { if (tsDnodeComponents[i].init() != 0) {
dnodeCleanupComponents(i); dnodeCleanupComponents(i);
code = -1; code = -1;
break; break;
...@@ -122,7 +122,7 @@ int32_t dnodeInitSystem() { ...@@ -122,7 +122,7 @@ int32_t dnodeInitSystem() {
void dnodeCleanUpSystem() { void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1); dnodeCleanupComponents(sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]) - 1);
taos_cleanup(); taos_cleanup();
taosCloseLog(); taosCloseLog();
} }
......
...@@ -151,6 +151,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 0x0405, "no disk perm ...@@ -151,6 +151,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 0x0405, "no disk perm
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 0x0406, "file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 0x0406, "file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 0x0407, "memory corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 0x0407, "memory corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUCH_FILE_OR_DIR, 0, 0x0408, "no such file or directory") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUCH_FILE_OR_DIR, 0, 0x0408, "no such file or directory")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_SHELL_CONNS, 0, 0x0409, "too many shell conns")
// client // client
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 0x0481, "invalid client version") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 0x0481, "invalid client version")
......
...@@ -137,6 +137,7 @@ enum _mgmt_table { ...@@ -137,6 +137,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_SCORES, TSDB_MGMT_TABLE_SCORES,
TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_STREAMTABLES,
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
}; };
...@@ -287,6 +288,9 @@ typedef struct { ...@@ -287,6 +288,9 @@ typedef struct {
char serverVersion[TSDB_VERSION_LEN]; char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth; int8_t writeAuth;
int8_t superAuth; int8_t superAuth;
int8_t reserved1;
int8_t reserved2;
int32_t connId;
SRpcIpSet ipList; SRpcIpSet ipList;
} SCMConnectRsp; } SCMConnectRsp;
...@@ -712,6 +716,7 @@ typedef struct { ...@@ -712,6 +716,7 @@ typedef struct {
} SStreamList; } SStreamList;
typedef struct { typedef struct {
uint32_t connId;
SQqueryList qlist; SQqueryList qlist;
SStreamList slist; SStreamList slist;
} SCMHeartBeatMsg; } SCMHeartBeatMsg;
...@@ -721,6 +726,7 @@ typedef struct { ...@@ -721,6 +726,7 @@ typedef struct {
uint32_t streamId; uint32_t streamId;
uint32_t totalDnodes; uint32_t totalDnodes;
uint32_t onlineDnodes; uint32_t onlineDnodes;
uint32_t connId;
int8_t killConnection; int8_t killConnection;
SRpcIpSet ipList; SRpcIpSet ipList;
} SCMHeartBeatRsp; } SCMHeartBeatRsp;
......
...@@ -24,6 +24,9 @@ extern "C" { ...@@ -24,6 +24,9 @@ extern "C" {
int32_t mnodeInitProfile(); int32_t mnodeInitProfile();
void mnodeCleanupProfile(); void mnodeCleanupProfile();
uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,6 +28,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon ...@@ -28,6 +28,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,14 +33,54 @@ ...@@ -33,14 +33,54 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeTable.h" #include "mnodeTable.h"
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeProfile.h"
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
} SMnodeComponent;
void *tsMnodeTmr; void *tsMnodeTmr;
static bool tsMgmtIsRunning = false; static bool tsMgmtIsRunning = false;
static const SMnodeComponent tsMnodeComponents[] = {
{"profile", mnodeInitProfile, mnodeCleanupProfile},
{"accts", mnodeInitAccts, mnodeCleanupAccts},
{"users", mnodeInitUsers, mnodeCleanupUsers},
{"dnodes", mnodeInitDnodes, mnodeCleanupDnodes},
{"dbs", mnodeInitDbs, mnodeCleanupDbs},
{"vgroups", mnodeInitVgroups, mnodeCleanupVgroups},
{"tables", mnodeInitTables, mnodeCleanupTables},
{"mnodes", mnodeInitMnodes, mnodeCleanupMnodes},
{"sdb", sdbInit, sdbCleanUp},
{"balance", balanceInit, balanceCleanUp},
{"grant", grantInit, grantCleanUp},
{"show", mnodeInitShow, mnodeCleanUpShow}
};
static void mnodeInitTimer(); static void mnodeInitTimer();
static void mnodeCleanupTimer(); static void mnodeCleanupTimer();
static bool mnodeNeedStart() ; static bool mnodeNeedStart() ;
static void mnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
tsMnodeComponents[i].cleanup();
}
}
static int32_t mnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) {
if (tsMnodeComponents[i].init() != 0) {
mnodeCleanupComponents(i);
code = -1;
break;
}
}
return code;
}
int32_t mnodeStartSystem() { int32_t mnodeStartSystem() {
if (tsMgmtIsRunning) { if (tsMgmtIsRunning) {
mPrint("mnode module already started..."); mPrint("mnode module already started...");
...@@ -57,57 +97,7 @@ int32_t mnodeStartSystem() { ...@@ -57,57 +97,7 @@ int32_t mnodeStartSystem() {
dnodeAllocateMnodeRqueue(); dnodeAllocateMnodeRqueue();
dnodeAllocateMnodePqueue(); dnodeAllocateMnodePqueue();
if (mnodeInitAccts() < 0) { if (mnodeInitComponents() != 0) {
mError("failed to init accts");
return -1;
}
if (mnodeInitUsers() < 0) {
mError("failed to init users");
return -1;
}
if (mnodeInitDnodes() < 0) {
mError("failed to init dnodes");
return -1;
}
if (mnodeInitDbs() < 0) {
mError("failed to init dbs");
return -1;
}
if (mnodeInitVgroups() < 0) {
mError("failed to init vgroups");
return -1;
}
if (mnodeInitTables() < 0) {
mError("failed to init tables");
return -1;
}
if (mnodeInitMnodes() < 0) {
mError("failed to init mnodes");
return -1;
}
if (sdbInit() < 0) {
mError("failed to init sdb");
return -1;
}
if (balanceInit() < 0) {
mError("failed to init balance")
}
if (grantInit() < 0) {
mError("failed to init grant");
return -1;
}
if (mnodeInitShow() < 0) {
mError("failed to init show");
return -1; return -1;
} }
...@@ -115,7 +105,6 @@ int32_t mnodeStartSystem() { ...@@ -115,7 +105,6 @@ int32_t mnodeStartSystem() {
tsMgmtIsRunning = true; tsMgmtIsRunning = true;
mPrint("mnode is initialized successfully"); mPrint("mnode is initialized successfully");
return 0; return 0;
} }
...@@ -133,17 +122,8 @@ void mnodeCleanupSystem() { ...@@ -133,17 +122,8 @@ void mnodeCleanupSystem() {
dnodeFreeMnodeRqueue(); dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue(); dnodeFreeMnodePqueue();
mnodeCleanupTimer(); mnodeCleanupTimer();
mnodeCleanUpShow(); mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
grantCleanUp();
balanceCleanUp();
sdbCleanUp();
mnodeCleanupMnodes();
mnodeCleanupTables();
mnodeCleanupVgroups();
mnodeCleanupDbs();
mnodeCleanupDnodes();
mnodeCleanupUsers();
mnodeCleanupAccts();
mPrint("mnode is cleaned up"); mPrint("mnode is cleaned up");
} }
......
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
#include "tutil.h" #include "tutil.h"
#include "ttime.h"
#include "tcache.h"
#include "tglobal.h"
#include "tdataformat.h"
#include "mnode.h" #include "mnode.h"
#include "mnodeDef.h" #include "mnodeDef.h"
#include "mnodeInt.h" #include "mnodeInt.h"
...@@ -32,23 +36,144 @@ ...@@ -32,23 +36,144 @@
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
#include "mnodeWrite.h" #include "mnodeWrite.h"
int32_t mnodeSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); #define CONN_KEEP_TIME (tsShellActivityTimer * 3)
#define CONN_CHECK_TIME (tsShellActivityTimer * 2)
int32_t mnodeKillQuery(char *qidstr, void *pConn);
int32_t mnodeKillStream(char *qidstr, void *pConn);
int32_t mnodeKillConnection(char *qidstr, void *pConn);
typedef struct { typedef struct {
char user[TSDB_TABLE_ID_LEN + 1]; char user[TSDB_USER_LEN + 1];
uint64_t stime; int8_t killed;
uint32_t ip;
uint16_t port; uint16_t port;
} SConnInfo; uint32_t ip;
uint32_t connId;
uint64_t stime;
} SConnObj;
extern void *tsMnodeTmr;
static SCacheObj *tsMnodeConnCache = NULL;
static uint32_t tsConnIndex = 0;
static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeFreeConn(void *data);
static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg);
// static int32_t mnodeKillQuery(char *qidstr, void *pConn);
// static int32_t mnodeKillStream(char *qidstr, void *pConn);
int32_t mnodeInitProfile() {
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
tsMnodeConnCache = taosCacheInitWithCb(tsMnodeTmr, CONN_CHECK_TIME, mnodeFreeConn);
return 0;
}
void mnodeCleanupProfile() {
if (tsMnodeConnCache != NULL) {
mPrint("conn cache is cleanup");
taosCacheCleanup(tsMnodeConnCache);
tsMnodeConnCache = NULL;
}
}
uint32_t mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable);
if (connSize > tsMaxShellConns) {
mError("failed to create conn for user:%s ip:%s:%u, conns:%d larger than maxShellConns:%d, ", user, taosIpStr(ip),
port, connSize, tsMaxShellConns);
terrno = TSDB_CODE_TOO_MANY_SHELL_CONNS;
return 0;
}
uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1);
SConnObj connObj = {
.ip = ip,
.port = port,
.connId = connId,
.stime = taosGetTimestampMs()
};
char key[10];
sprintf(key, "%u", connId);
strcpy(connObj.user, user);
void *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME);
taosCacheRelease(tsMnodeConnCache, &pConn, false);
mTrace("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return connId;
}
bool mnodeCheckConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
char key[10];
sprintf(key, "%u", connId);
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime);
if (pConn == NULL) {
mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return false;
}
if (pConn->ip != ip || pConn->port != port /* || strcmp(pConn->user, user) != 0 */ ) {
mError("connId:%d, incoming conn user:%s ip:%s:%u, not match exist conn user:%s ip:%s:%u", connId, user,
taosIpStr(ip), port, pConn->user, taosIpStr(pConn->ip), pConn->port);
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false);
return false;
}
//mTrace("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port);
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false);
return true;
}
static void mnodeFreeConn(void *data) {
SConnObj *pConn = data;
mTrace("connId:%d, is destroyed", pConn->connId);
}
static void *mnodeGetNextConn(SHashMutableIterator *pIter, SConnObj **pConn) {
*pConn = NULL;
if (pIter == NULL) {
pIter = taosHashCreateIter(tsMnodeConnCache->pHashTable);
}
if (!taosHashIterNext(pIter)) {
taosHashDestroyIter(pIter);
return NULL;
}
SCacheDataNode **pNode = taosHashIterGet(pIter);
if (pNode == NULL || *pNode == NULL) {
taosHashDestroyIter(pIter);
return NULL;
}
*pConn = (SConnObj*)((*pNode)->data);
return pIter;
}
typedef struct { typedef struct {
int numOfConns; int numOfConns;
int index; int index;
SConnInfo connInfo[]; SConnObj connInfo[];
} SConnShow; } SConnShow;
typedef struct { typedef struct {
...@@ -400,100 +525,23 @@ int32_t mnodeKillStream(char *qidstr, void *pConn) { ...@@ -400,100 +525,23 @@ int32_t mnodeKillStream(char *qidstr, void *pConn) {
return TSDB_CODE_INVALID_STREAM_ID; return TSDB_CODE_INVALID_STREAM_ID;
} }
int32_t mnodeKillConnection(char *qidstr, void *pConn) {
// void *pConn1 = NULL;
// char * temp, *chr, idstr[64];
// strcpy(idstr, qidstr);
//
// temp = idstr;
// chr = strchr(temp, ':');
// if (chr == NULL) goto _error;
// *chr = 0;
// uint32_t ip = inet_addr(temp);
//
// temp = chr + 1;
// uint16_t port = htons(atoi(temp));
// SAcctObj *pAcct = pConn->pAcct;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConn = pAcct->pConn;
// while (pConn) {
// if (pConn->ip == ip && pConn->port == port) {
// // there maybe two connections from a shell
// if (pConn1 == NULL)
// pConn1 = pConn;
// else
// break;
// }
//
// pConn = pConn->next;
// }
//
// if (pConn1) pConn1->killConnection = 1;
// if (pConn) pConn->killConnection = 1;
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// if (pConn1 == NULL) goto _error;
//
// mTrace("connection:%s is there, kill it", qidstr);
// return 0;
//
//_error:
// mTrace("connection:%s is not there", qidstr);
return TSDB_CODE_INVALID_CONNECTION;
}
int mnodeGetConns(SShowObj *pShow, void *pConn) {
// SAcctObj * pAcct = pConn->pAcct;
// SConnShow *pConnShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
// pConnShow->index = 0;
// pConnShow->numOfConns = 0;
//
// if (pAcct->acctInfo.numOfConns > 0) {
// pConn = pAcct->pConn;
// SConnInfo *pConnInfo = pConnShow->connInfo;
//
// while (pConn && pConn->pUser) {
// strcpy(pConnInfo->user, pConn->pUser->user);
// pConnInfo->ip = pConn->ip;
// pConnInfo->port = pConn->port;
// pConnInfo->stime = pConn->stime;
//
// pConnShow->numOfConns++;
// pConnInfo++;
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pIter = pConnShow;
return 0;
}
int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "connId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user"); strcpy(pSchema[cols].name, "user");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port"); strcpy(pSchema[cols].name, "ip:port");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
...@@ -509,54 +557,64 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { ...@@ -509,54 +557,64 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
pShow->offset[0] = 0; pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = 1000000; pShow->numOfRows = taosHashGetSize(tsMnodeConnCache->pHashTable);
pShow->pIter = NULL;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mnodeGetConns(pShow, pConn);
return 0; return 0;
} }
int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
char *pWrite; SConnObj *pConnObj = NULL;
int32_t cols = 0; int32_t cols = 0;
char * pWrite;
SConnShow *pConnShow = (SConnShow *)pShow->pIter; char ipStr[TSDB_IPv4ADDR_LEN + 7];
if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index;
while (numOfRows < rows) { while (numOfRows < rows) {
SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj);
if (pConnObj == NULL) break;
cols = 0; cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pNode->user); *(int32_t *) pWrite = pConnObj->connId;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
uint32_t ip = pNode->ip; STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, TSDB_USER_LEN);
sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port));
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pNode->stime; snprintf(ipStr, TSDB_IPv4ADDR_LEN + 6, "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, TSDB_IPv4ADDR_LEN + 6);
cols++; cols++;
numOfRows++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
pConnShow->index++; *(int64_t *)pWrite = pConnObj->stime;
} cols++;
if (numOfRows == 0) { numOfRows++;
tfree(pConnShow);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 4;
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
return numOfRows; return numOfRows;
} }
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
return 0;
}
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
return 0;
}
int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) {
// SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
...@@ -608,41 +666,18 @@ int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { ...@@ -608,41 +666,18 @@ int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) {
} }
int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
// SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SUserObj *pUser = pMsg->pUser;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
// SUserObj *pUser = mnodeGetUserFromConn(pMsg->thandle); SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
// if (pUser == NULL) { SConnObj * pConn = taosCacheAcquireByName(tsMnodeConnCache, pKill->queryId);
// rpcRsp.code = TSDB_CODE_INVALID_USER; if (pConn == NULL) {
// rpcSendResponse(&rpcRsp); mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
// return; return TSDB_CODE_INVALID_CONNECTION;
// } } else {
mError("connId:%s, is killed by user:%s", pKill->queryId, pUser->user);
// SCMKillConnMsg *pKill = pMsg->pCont; pConn->killed = 1;
// int32_t code; taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false);
// if (!pUser->writeAuth) {
// code = TSDB_CODE_NO_RIGHTS;
// } else {
// code = mgmtKillConnection(pKill->queryId, pMsg->thandle);
// }
// rpcRsp.code = code;
// rpcSendResponse(&rpcRsp);
// mnodeDecUserRef(pUser);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
} }
int32_t mnodeInitProfile() {
// mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta);
// mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries);
// mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mnodeGetConnsMeta);
// mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mnodeRetrieveConns);
// mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mnodeProcessKillQueryMsg);
// mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
// mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
return 0;
}
void mnodeCleanupProfile() {}
...@@ -227,6 +227,23 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -227,6 +227,23 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
SRpcConnInfo connInfo;
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
int32_t connId = htonl(pHBMsg->connId);
if (!mnodeCheckConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort)) {
connId = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (connId == 0) {
#if 0
// do not close existing links, otherwise
mError("failed to create connId, close connect");
pHBRsp->killConnection = 1;
#endif
}
}
pHBRsp->connId = htonl(connId);
pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); mnodeGetMnodeIpSetForShell(&pHBRsp->ipList);
...@@ -235,9 +252,9 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -235,9 +252,9 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
* TODO * TODO
* Dispose kill stream or kill query message * Dispose kill stream or kill query message
*/ */
pHBRsp->queryId = 0; // pHBRsp->queryId = 0;
pHBRsp->streamId = 0; // pHBRsp->streamId = 0;
pHBRsp->killConnection = 0; // pHBRsp->killConnection = 0;
pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.rsp = pHBRsp;
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
...@@ -281,10 +298,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -281,10 +298,14 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
goto connect_over; goto connect_over;
} }
int32_t connId = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (connId == 0) code = terrno;
sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
strcpy(pConnectRsp->serverVersion, version); strcpy(pConnectRsp->serverVersion, version);
pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth; pConnectRsp->superAuth = pUser->superAuth;
pConnectRsp->connId = htonl(connId);
mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList);
...@@ -358,3 +379,11 @@ static void mnodeReleaseShowObj(void *pShow, bool forceRemove) { ...@@ -358,3 +379,11 @@ static void mnodeReleaseShowObj(void *pShow, bool forceRemove) {
mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false"); mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false");
taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove); taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove);
} }
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
\ No newline at end of file
...@@ -61,8 +61,8 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -61,8 +61,8 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
...@@ -568,8 +568,8 @@ int32_t mnodeInitTables() { ...@@ -568,8 +568,8 @@ int32_t mnodeInitTables() {
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeGetStreamTableMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeRetrieveStreamTables);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2108,14 +2108,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -2108,14 +2108,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
return 0; return 0;
} }
static void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
...@@ -2259,7 +2251,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { ...@@ -2259,7 +2251,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
return code; return code;
} }
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
...@@ -2305,7 +2297,7 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -2305,7 +2297,7 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
return 0; return 0;
} }
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
......
...@@ -111,6 +111,15 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz ...@@ -111,6 +111,15 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
*/ */
void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key); void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key);
/**
* update the expire time of data in cache
* @param pCacheObj cache object
* @param key key
* @param expireTime new expire time of data
* @return
*/
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime);
/** /**
* Add one reference count for the exist data, and assign this data for a new owner. * Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
......
...@@ -487,6 +487,35 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { ...@@ -487,6 +487,35 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
return (ptNode != NULL) ? (*ptNode)->data : NULL; return (ptNode != NULL) ? (*ptNode)->data : NULL;
} }
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
return NULL;
}
uint32_t keyLen = (uint32_t)strlen(key);
__cache_rd_lock(pCacheObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
if (ptNode != NULL) {
T_REF_INC(*ptNode);
(*ptNode)->expiredTime = expireTime;
}
__cache_unlock(pCacheObj);
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uTrace("key:%s expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uTrace("key:%s not in cache, retrieved failed", key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL; if (pCacheObj == NULL || data == NULL) return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册