提交 30eb8d50 编写于 作者: S Shengliang Guan

shm

上级 187eb299
...@@ -49,11 +49,13 @@ typedef struct SRpcMsg { ...@@ -49,11 +49,13 @@ typedef struct SRpcMsg {
} SRpcMsg; } SRpcMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
SRpcMsg rpcMsg; uint32_t clientIp;
int32_t rspLen; uint16_t clientPort;
void * pRsp; SRpcMsg rpcMsg;
void * pNode; int32_t rspLen;
void *pRsp;
void *pNode;
} SNodeMsg; } SNodeMsg;
typedef struct SRpcInit { typedef struct SRpcInit {
......
...@@ -42,6 +42,8 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) { ...@@ -42,6 +42,8 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
} }
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->clientIp = connInfo.clientIp;
pMsg->clientPort = connInfo.clientPort;
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg)); memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
return 0; return 0;
} }
......
...@@ -44,7 +44,8 @@ typedef struct { ...@@ -44,7 +44,8 @@ typedef struct {
SQueryDesc *pQueries; SQueryDesc *pQueries;
} SConnObj; } SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime); static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn); static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
...@@ -94,7 +95,8 @@ void mndCleanupProfile(SMnode *pMnode) { ...@@ -94,7 +95,8 @@ void mndCleanupProfile(SMnode *pMnode) {
} }
} }
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) { static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
...@@ -104,8 +106,8 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, ...@@ -104,8 +106,8 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid,
SConnObj connObj = {.id = connId, SConnObj connObj = {.id = connId,
.appStartTimeMs = startTime, .appStartTimeMs = startTime,
.pid = pid, .pid = pid,
.ip = pInfo->clientIp, .ip = ip,
.port = pInfo->clientPort, .port = port,
.killed = 0, .killed = 0,
.loginTimeMs = taosGetTimestampMs(), .loginTimeMs = taosGetTimestampMs(),
.lastAccessTimeMs = 0, .lastAccessTimeMs = 0,
...@@ -114,17 +116,17 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, ...@@ -114,17 +116,17 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid,
.pQueries = NULL}; .pQueries = NULL};
connObj.lastAccessTimeMs = connObj.loginTimeMs; connObj.lastAccessTimeMs = connObj.loginTimeMs;
tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN); tstrncpy(connObj.user, user, TSDB_USER_LEN);
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN); tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 3; int32_t keepTime = tsShellActivityTimer * 3;
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) { if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr()); mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
return NULL; return NULL;
} else { } else {
mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user); mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user);
return pConn; return pConn;
} }
} }
...@@ -184,20 +186,14 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -184,20 +186,14 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t code = -1; int32_t code = -1;
SConnectReq connReq = {0}; SConnectReq connReq = {0};
char ip[30] = {0};
if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) { if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto CONN_OVER; goto CONN_OVER;
} }
SRpcConnInfo info = {0}; taosIp2String(pReq->clientIp, ip);
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
goto CONN_OVER;
}
char ip[30];
taosIp2String(info.clientIp, ip);
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
...@@ -216,7 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -216,7 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
} }
} }
pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime); pConn =
mndCreateConn(pMnode, pReq->user, pReq->clientIp, pReq->clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
goto CONN_OVER; goto CONN_OVER;
...@@ -241,7 +238,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -241,7 +238,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
pReq->rspLen = contLen; pReq->rspLen = contLen;
pReq->pRsp = pRsp; pReq->pRsp = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app); mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app);
code = 0; code = 0;
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include <sys/wait.h> #include <sys/wait.h>
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024) #define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
#define CEIL8(n) (ceil((float)(n) / 8) * 8)
typedef void *(*ProcThreadFp)(void *param); typedef void *(*ProcThreadFp)(void *param);
typedef struct SProcQueue { typedef struct SProcQueue {
...@@ -58,6 +57,11 @@ typedef struct SProcObj { ...@@ -58,6 +57,11 @@ typedef struct SProcObj {
bool stopFlag; bool stopFlag;
} SProcObj; } SProcObj;
static inline int32_t CEIL8(int32_t v) {
const int32_t c = ceil((float)(v) / 8) * 8;
return c < 8 ? 8 : c;
}
static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
TdThreadMutex *pMutex = NULL; TdThreadMutex *pMutex = NULL;
TdThreadMutexAttr mattr = {0}; TdThreadMutexAttr mattr = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册