From 3b1add19f10a0a35f06f6b180d73d3fea713202f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 25 Oct 2021 10:38:15 +0800 Subject: [PATCH] [TD-10430] add dnode trans file --- include/server/mnode/mnode.h | 10 + include/util/tdef.h | 1 + source/server/dnode/inc/dnodeInt.h | 2 + source/server/dnode/inc/dnodeMain.h | 10 +- source/server/dnode/inc/dnodeTrans.h | 1 - source/server/dnode/src/dnodeInt.c | 7 +- source/server/dnode/src/dnodeMain.c | 8 +- source/server/dnode/src/dnodeTrans.c | 296 +++++++++++++-------------- source/server/mnode/inc/mnodeInt.h | 1 + source/server/mnode/src/mnodeAuth.c | 19 +- source/server/mnode/src/mondeInt.c | 6 +- 11 files changed, 189 insertions(+), 172 deletions(-) diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index dab2b1e4ae..f15fc1792a 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -46,6 +46,16 @@ typedef struct { */ void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell); + /** + * Get the corresponding endpoint information from dnodeId. + * + * @param dnode, the instance of dDnode module. + * @param dnodeId, the id ot dnode. + * @param ep, the endpoint of dnode. + * @param fqdn, the fqdn of dnode. + * @param port, the port of dnode. + */ + void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); } SMnodeFp; typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index 008c9215ba..2cac7fc7f4 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -68,6 +68,7 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_DATA_NULL_STR "NULL" #define TSDB_DATA_NULL_STR_L "null" +#define TSDB_NETTEST_USER "nettestinternal" #define TSDB_DEFAULT_USER "root" #ifdef _TD_POWER_ #define TSDB_DEFAULT_PASS "powerdb" diff --git a/source/server/dnode/inc/dnodeInt.h b/source/server/dnode/inc/dnodeInt.h index 82cdfb52bf..38eac2794e 100644 --- a/source/server/dnode/inc/dnodeInt.h +++ b/source/server/dnode/inc/dnodeInt.h @@ -19,7 +19,9 @@ #ifdef __cplusplus extern "C" { #endif +#include "os.h" #include "taosmsg.h" +#include "tglobal.h" #include "tlog.h" #include "trpc.h" #include "dnode.h" diff --git a/source/server/dnode/inc/dnodeMain.h b/source/server/dnode/inc/dnodeMain.h index 245ede0001..325136caa6 100644 --- a/source/server/dnode/inc/dnodeMain.h +++ b/source/server/dnode/inc/dnodeMain.h @@ -22,10 +22,10 @@ extern "C" { #include "dnodeInt.h" typedef enum { - TD_RUN_STAT_INIT, - TD_RUN_STAT_RUNNING, - TD_RUN_STAT_STOPPED -} RunStat; + DN_RUN_STAT_INIT, + DN_RUN_STAT_RUNNING, + DN_RUN_STAT_STOPPED +} EDnRunStat; int32_t dnodeInitMain(); void dnodeCleanupMain(); @@ -36,7 +36,7 @@ void dnodeReportStartupFinished(char *name, char *desc); void dnodeProcessStartupReq(SRpcMsg *pMsg); void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); -RunStat dnodeGetRunStat(); +EDnRunStat dnodeGetRunStat(); void dnodeSetRunStat(); void* dnodeGetTimer(); diff --git a/source/server/dnode/inc/dnodeTrans.h b/source/server/dnode/inc/dnodeTrans.h index 631c69d11c..f2dc647de3 100644 --- a/source/server/dnode/inc/dnodeTrans.h +++ b/source/server/dnode/inc/dnodeTrans.h @@ -25,7 +25,6 @@ int32_t dnodeInitTrans(); void dnodeCleanupTrans(); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); -void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); #ifdef __cplusplus } diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c index 8b7f2f0445..7ece1917a7 100644 --- a/source/server/dnode/src/dnodeInt.c +++ b/source/server/dnode/src/dnodeInt.c @@ -46,6 +46,7 @@ static int32_t dnodeInitVnodeModule(void **unused) { static int32_t dnodeInitMnodeModule(void **unused) { SMnodePara para; + para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendRedirectMsg = dnodeSendRedirectMsg; @@ -77,7 +78,7 @@ int32_t dnodeInit() { taosStepExec(tsSteps); - dnodeSetRunStat(TD_RUN_STAT_RUNNING); + dnodeSetRunStat(DN_RUN_STAT_RUNNING); dnodeReportStartupFinished("TDengine", "initialized successfully"); dInfo("TDengine is initialized successfully"); @@ -85,8 +86,8 @@ int32_t dnodeInit() { } void dnodeCleanup() { - if (dnodeGetRunStat() != TD_RUN_STAT_STOPPED) { - dnodeSetRunStat(TD_RUN_STAT_STOPPED); + if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) { + dnodeSetRunStat(DN_RUN_STAT_STOPPED); taosStepCleanup(tsSteps); tsSteps = NULL; } diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c index bd8dbe5994..50702823f1 100644 --- a/source/server/dnode/src/dnodeMain.c +++ b/source/server/dnode/src/dnodeMain.c @@ -29,7 +29,7 @@ #include "mnode.h" static struct { - RunStat runStatus; + EDnRunStat runStatus; void * dnodeTimer; SStartupStep startup; } tsDmain; @@ -55,7 +55,7 @@ static void dnodeCheckDataDirOpenned(char *dir) { } int32_t dnodeInitMain() { - tsDmain.runStatus = TD_RUN_STAT_STOPPED; + tsDmain.runStatus = DN_RUN_STAT_STOPPED; tsDmain.dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); if (tsDmain.dnodeTimer == NULL) { dError("failed to init dnode timer"); @@ -260,8 +260,8 @@ void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -RunStat dnodeGetRunStat() { return tsDmain.runStatus; } +EDnRunStat dnodeGetRunStat() { return tsDmain.runStatus; } -void dnodeSetRunStat(RunStat stat) { tsDmain.runStatus = stat; } +void dnodeSetRunStat(EDnRunStat stat) { tsDmain.runStatus = stat; } void* dnodeGetTimer() { return tsDmain.dnodeTimer; } \ No newline at end of file diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 8eef9975e7..68434609c2 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -20,21 +20,19 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "tglobal.h" +#include "dnodeTrans.h" #include "dnodeMain.h" #include "dnodeMnodeEps.h" #include "dnodeStatus.h" -#include "dnodeTrans.h" -#include "vnode.h" #include "mnode.h" +#include "vnode.h" -typedef void (*RpcMsgFp)( SRpcMsg *pMsg); +typedef void (*RpcMsgFp)(SRpcMsg *pMsg); static struct { - void * serverRpc; - void * clientRpc; - void * shellRpc; + void *serverRpc; + void *clientRpc; + void *shellRpc; int32_t queryReqNum; int32_t submitReqNum; RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX]; @@ -43,18 +41,18 @@ static struct { static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; + int32_t msgType = pMsg->msgType; - if (pMsg->pCont == NULL) return; - if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { + if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { dnodeProcessStartupReq(pMsg); return; } - if (dnodeGetRunStat() != TD_RUN_STAT_RUNNING) { + if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { rspMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); - dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); + dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); return; } @@ -64,38 +62,40 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { return; } - RpcMsgFp fp = tsTrans.peerMsgFp[pMsg->msgType]; + RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; if (fp != NULL) { + dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { - dDebug("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); + dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]); rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); } } -int32_t dnodeInitServer() { - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg; +static int32_t dnodeInitServer() { + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg; + + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -117,7 +117,7 @@ int32_t dnodeInitServer() { return 0; } -void dnodeCleanupServer() { +static void dnodeCleanupServer() { if (tsTrans.serverRpc) { rpcClose(tsTrans.serverRpc); tsTrans.serverRpc = NULL; @@ -125,65 +125,66 @@ void dnodeCleanupServer() { } } -static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - if (dnodeGetRunStat() == TD_RUN_STAT_STOPPED) { +static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + int32_t msgType = pMsg->msgType; + + if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("msg:%p is ignored since dnode is stopping", pMsg); + dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); rpcFreeCont(pMsg->pCont); return; } - if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { + if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { dnodeUpdateMnodeFromPeer(pEpSet); } - RpcMsgFp fp = tsTrans.peerMsgFp[pMsg->msgType]; + RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; if (fp != NULL) { + dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { - dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; - rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; - rpcSendResponse(&rspMsg); + dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); } rpcFreeCont(pMsg->pCont); } -int32_t dnodeInitClient() { - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg; +static int32_t dnodeInitClient() { + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg; + + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; - char secret[TSDB_KEY_LEN] = "secret"; + char secret[TSDB_KEY_LEN] = "secret"; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = "DND-C"; + rpcInit.label = "DND-C"; rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessRspFromPeer; - rpcInit.sessions = TSDB_MAX_VNODES << 4; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.user = "t"; - rpcInit.ckey = "key"; - rpcInit.secret = secret; + rpcInit.cfp = dnodeProcessPeerRsp; + rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.user = "t"; + rpcInit.ckey = "key"; + rpcInit.secret = secret; tsTrans.clientRpc = rpcOpen(&rpcInit); if (tsTrans.clientRpc == NULL) { @@ -195,7 +196,7 @@ int32_t dnodeInitClient() { return 0; } -void dnodeCleanupClient() { +static void dnodeCleanupClient() { if (tsTrans.clientRpc) { rpcClose(tsTrans.clientRpc); tsTrans.clientRpc = NULL; @@ -203,59 +204,50 @@ void dnodeCleanupClient() { } } -static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; +static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; + int32_t msgType = pMsg->msgType; - if (pMsg->pCont == NULL) return; - if (dnodeGetRunStat() == TD_RUN_STAT_STOPPED) { - dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]); - rpcMsg.code = TSDB_CODE_DND_EXITING; - rpcSendResponse(&rpcMsg); + if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { + dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]); + rspMsg.code = TSDB_CODE_DND_EXITING; + rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); return; - } else if (dnodeGetRunStat() != TD_RUN_STAT_RUNNING) { - dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); - rpcMsg.code = TSDB_CODE_APP_NOT_READY; - rpcSendResponse(&rpcMsg); + } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { + dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); + rspMsg.code = TSDB_CODE_APP_NOT_READY; + rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); return; } - if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { + if (pMsg->pCont == NULL) { + rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN; + rpcSendResponse(&rspMsg); + return; + } + + if (msgType == TSDB_MSG_TYPE_QUERY) { atomic_fetch_add_32(&tsTrans.queryReqNum, 1); - } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { + } else if (msgType == TSDB_MSG_TYPE_SUBMIT) { atomic_fetch_add_32(&tsTrans.submitReqNum, 1); - } else {} + } else { + } - RpcMsgFp fp = tsTrans.shellMsgFp[pMsg->msgType]; + RpcMsgFp fp = tsTrans.shellMsgFp[msgType]; if (fp != NULL) { + dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { - dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); - rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; - rpcSendResponse(&rpcMsg); + dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]); + rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); } } -static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - if (strcmp(user, "nettestinternal") == 0) { - char pass[32] = {0}; - taosEncryptPass((uint8_t *)user, strlen(user), pass); - *spi = 0; - *encrypt = 0; - *ckey = 0; - memcpy(secret, pass, TSDB_KEY_LEN); - dTrace("nettest user is authorized"); - return 0; - } - - return -1; -} - -void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); -} +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { SRpcEpSet epSet = {0}; @@ -263,19 +255,13 @@ void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { dnodeSendMsgToDnode(&epSet, rpcMsg); } -void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { +static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; dnodeGetEpSetForPeer(&epSet); rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); } -void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { - rpcSendRecv(tsTrans.clientRpc, epSet, rpcMsg, rpcRsp); -} - static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - if (dnodeAuthNetTest(user, spi, encrypt, secret, ckey) == 0) return 0; - int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); if (code != TSDB_CODE_APP_NOT_READY) return code; @@ -306,52 +292,52 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c return rpcRsp.code; } -int32_t dnodeInitShell() { +static int32_t dnodeInitShell() { tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; // the following message shall be treated as mnode write - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg; // the following message shall be treated as mnode query - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { @@ -360,14 +346,14 @@ int32_t dnodeInitShell() { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = tsDnodeShellPort; - rpcInit.label = "SHELL"; + rpcInit.localPort = tsDnodeShellPort; + rpcInit.label = "SHELL"; rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = dnodeProcessMsgFromShell; - rpcInit.sessions = tsMaxShellConns; - rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.afp = dnodeRetrieveUserAuthInfo; + rpcInit.cfp = dnodeProcessShellReq; + rpcInit.sessions = tsMaxShellConns; + rpcInit.connType = TAOS_CONN_SERVER; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.afp = dnodeRetrieveUserAuthInfo; tsTrans.shellRpc = rpcOpen(&rpcInit); if (tsTrans.shellRpc == NULL) { @@ -379,7 +365,7 @@ int32_t dnodeInitShell() { return 0; } -void dnodeCleanupShell() { +static void dnodeCleanupShell() { if (tsTrans.shellRpc) { rpcClose(tsTrans.shellRpc); tsTrans.shellRpc = NULL; diff --git a/source/server/mnode/inc/mnodeInt.h b/source/server/mnode/inc/mnodeInt.h index 654822ce40..42d3c53fa2 100644 --- a/source/server/mnode/inc/mnodeInt.h +++ b/source/server/mnode/inc/mnodeInt.h @@ -30,6 +30,7 @@ EMnStatus mnodeGetStatus(); void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); +void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); #ifdef __cplusplus } diff --git a/source/server/mnode/src/mnodeAuth.c b/source/server/mnode/src/mnodeAuth.c index f5b56e2968..bb3289ebeb 100644 --- a/source/server/mnode/src/mnodeAuth.c +++ b/source/server/mnode/src/mnodeAuth.c @@ -15,7 +15,22 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "mnodeInt.h" +#include "mnodeAuth.h" int32_t mnodeInitAuth() { return 0; } -void mnodeCleanupAuth() {} \ No newline at end of file +void mnodeCleanupAuth() {} + +int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + if (strcmp(user, TSDB_NETTEST_USER) == 0) { + char pass[32] = {0}; + taosEncryptPass((uint8_t *)user, strlen(user), pass); + *spi = 0; + *encrypt = 0; + *ckey = 0; + memcpy(secret, pass, TSDB_KEY_LEN); + mDebug("nettest user is authorized"); + return 0; + } + + return 0; +} \ No newline at end of file diff --git a/source/server/mnode/src/mondeInt.c b/source/server/mnode/src/mondeInt.c index 41cf5f14b3..3d48ea1587 100644 --- a/source/server/mnode/src/mondeInt.c +++ b/source/server/mnode/src/mondeInt.c @@ -61,6 +61,10 @@ void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsMint.fp.SendMsgToM void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.fp.SendRedirectMsg)(rpcMsg, forShell); } +void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) { + (*tsMint.fp.GetDnodeEp)(dnodeId, ep, fqdn, port); +} + int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; } static int32_t mnodeSetPara(SMnodePara para) { @@ -242,5 +246,3 @@ void mnodeCleanup() { mInfo("mnode is cleaned up"); } } - -int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; } -- GitLab