提交 72cb9c01 编写于 作者: S Shengliang Guan

minor changes

上级 f4e648b7
...@@ -25,11 +25,11 @@ ...@@ -25,11 +25,11 @@
#include "dnodeMnode.h" #include "dnodeMnode.h"
#include "dnodeVnodes.h" #include "dnodeVnodes.h"
#include "mnode.h" #include "mnode.h"
#include "vnode.h"
static struct { static struct {
void *serverRpc; void *peerRpc;
void *clientRpc;
void *shellRpc; void *shellRpc;
void *clientRpc;
MsgFp msgFp[TSDB_MSG_TYPE_MAX]; MsgFp msgFp[TSDB_MSG_TYPE_MAX];
} tsTrans; } tsTrans;
...@@ -120,7 +120,7 @@ static void dnodeInitMsgFp() { ...@@ -120,7 +120,7 @@ static void dnodeInitMsgFp() {
} }
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rspMsg = {.handle = pMsg->handle};
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
...@@ -154,7 +154,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -154,7 +154,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
} }
static int32_t dnodeInitServer() { static int32_t dnodeInitPeerServer() {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort; rpcInit.localPort = tsDnodeDnodePort;
...@@ -165,8 +165,8 @@ static int32_t dnodeInitServer() { ...@@ -165,8 +165,8 @@ static int32_t dnodeInitServer() {
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
tsTrans.serverRpc = rpcOpen(&rpcInit); tsTrans.peerRpc = rpcOpen(&rpcInit);
if (tsTrans.serverRpc == NULL) { if (tsTrans.peerRpc == NULL) {
dError("failed to init peer rpc server"); dError("failed to init peer rpc server");
return -1; return -1;
} }
...@@ -175,10 +175,10 @@ static int32_t dnodeInitServer() { ...@@ -175,10 +175,10 @@ static int32_t dnodeInitServer() {
return 0; return 0;
} }
static void dnodeCleanupServer() { static void dnodeCleanupPeerServer() {
if (tsTrans.serverRpc) { if (tsTrans.peerRpc) {
rpcClose(tsTrans.serverRpc); rpcClose(tsTrans.peerRpc);
tsTrans.serverRpc = NULL; tsTrans.peerRpc = NULL;
dInfo("dnode peer server is closed"); dInfo("dnode peer server is closed");
} }
} }
...@@ -205,7 +205,8 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -205,7 +205,8 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
static int32_t dnodeInitClient() { static int32_t dnodeInitClient() {
char secret[TSDB_KEY_LEN] = "secret"; char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
...@@ -237,7 +238,7 @@ static void dnodeCleanupClient() { ...@@ -237,7 +238,7 @@ static void dnodeCleanupClient() {
} }
static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rspMsg = {.handle = pMsg->handle};
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
...@@ -272,14 +273,6 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -272,14 +273,6 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
} }
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg);
}
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet); dnodeGetMnodeEpSetForPeer(&epSet);
...@@ -293,20 +286,16 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c ...@@ -293,20 +286,16 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user)); tstrncpy(pMsg->user, user, sizeof(pMsg->user));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_AUTH;
dDebug("user:%s, send auth msg to mnodes", user); dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
} else { } else {
SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user); dDebug("user:%s, auth msg received from mnodes", user);
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN); memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN); memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi; *spi = pRsp->spi;
...@@ -317,7 +306,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c ...@@ -317,7 +306,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
return rpcRsp.code; return rpcRsp.code;
} }
static int32_t dnodeInitShell() { static int32_t dnodeInitShellServer() {
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
...@@ -344,7 +333,7 @@ static int32_t dnodeInitShell() { ...@@ -344,7 +333,7 @@ static int32_t dnodeInitShell() {
return 0; return 0;
} }
static void dnodeCleanupShell() { static void dnodeCleanupShellServer() {
if (tsTrans.shellRpc) { if (tsTrans.shellRpc) {
rpcClose(tsTrans.shellRpc); rpcClose(tsTrans.shellRpc);
tsTrans.shellRpc = NULL; tsTrans.shellRpc = NULL;
...@@ -356,11 +345,11 @@ int32_t dnodeInitTrans() { ...@@ -356,11 +345,11 @@ int32_t dnodeInitTrans() {
return -1; return -1;
} }
if (dnodeInitServer() != 0) { if (dnodeInitPeerServer() != 0) {
return -1; return -1;
} }
if (dnodeInitShell() != 0) { if (dnodeInitShellServer() != 0) {
return -1; return -1;
} }
...@@ -368,7 +357,15 @@ int32_t dnodeInitTrans() { ...@@ -368,7 +357,15 @@ int32_t dnodeInitTrans() {
} }
void dnodeCleanupTrans() { void dnodeCleanupTrans() {
dnodeCleanupShell(); dnodeCleanupShellServer();
dnodeCleanupServer(); dnodeCleanupPeerServer();
dnodeCleanupClient(); dnodeCleanupClient();
} }
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册