diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d04fa9900d1c4915fde7c181bad8cbe3d9dc0686..e88da39d04e02c5a3283353d52fd3fed0c92d99c 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -298,7 +298,6 @@ typedef struct STscObj { char sversion[TSDB_VERSION_LEN]; char writeAuth : 1; char superAuth : 1; - void* pMgmtConn; struct SSqlObj * pSql; struct SSqlObj * pHb; struct SSqlObj * sqlList; @@ -358,7 +357,7 @@ typedef struct SSqlStream { struct SSqlStream *prev, *next; } SSqlStream; -int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn); +int32_t tscInitRpc(const char *user, const char *secret); void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); @@ -425,7 +424,7 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column); -extern void * pVnodeConn; +extern void * pDnodeConn; extern void * tscCacheHandle; extern void * tscTmr; extern void * tscQhandle; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 2bef303df5172a8887aa4ca873d7cecc3118750c..9c57554586a7403cf8cb26594ef5b00ce845e580 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -191,7 +191,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { } int tscSendMsgToServer(SSqlObj *pSql) { - STscObj* pObj = pSql->pTscObj; SSqlCmd* pCmd = &pSql->cmd; char *pMsg = rpcMallocCont(pCmd->payloadLen); @@ -201,30 +200,22 @@ int tscSendMsgToServer(SSqlObj *pSql) { } if (pSql->cmd.command < TSDB_SQL_MGMT) { - tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); + } else { + pSql->ipList = tscMgmtIpSet; + memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); + } + + tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port); - SRpcMsg rpcMsg = { + SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, .pCont = pMsg, .contLen = pSql->cmd.payloadLen, .handle = pSql, .code = 0 - }; - rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg); - } else { - pSql->ipList = tscMgmtIpSet; - memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); - SRpcMsg rpcMsg = { - .msgType = pSql->cmd.msgType, - .pCont = pMsg, - .contLen = pSql->cmd.payloadLen, - .handle = pSql, - .code = 0 - }; - tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg); - } + }; + rpcSendRequest(pDnodeConn, &pSql->ipList, &rpcMsg); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 1934c05014194deef8a8427615adf2855e5343cc..7d6768b144e5253bf30d5ccb8b969b8f1b4d7118 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -66,8 +66,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con return NULL; } - void* pMgmtConn = NULL; - if (tscInitRpc(user, pass, &pMgmtConn) != 0) { + if (tscInitRpc(user, pass) != 0) { terrno = TSDB_CODE_NETWORK_UNAVAIL; return NULL; } @@ -119,7 +118,6 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con strtolower(pObj->db, tmp); } - pObj->pMgmtConn = pMgmtConn; pthread_mutex_init(&pObj->mutex, NULL); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index c0ad91cd59a8b638134ab5234402e97922c0ae35..aa3c836ba01ea2df9c6a4b4cbc379c2fbb2c34ad 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -30,7 +30,7 @@ #include "tlocale.h" // global, not configurable -void * pVnodeConn; +void * pDnodeConn; void * tscCacheHandle; void * tscTmr; void * tscQhandle; @@ -48,12 +48,12 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } -int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) { +int32_t tscInitRpc(const char *user, const char *secret) { SRpcInit rpcInit; char secretEncrypt[32] = {0}; taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); - if (pVnodeConn == NULL) { + if (pDnodeConn == NULL) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC-vnode"; @@ -66,35 +66,13 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) { rpcInit.ckey = "key"; rpcInit.secret = secretEncrypt; - pVnodeConn = rpcOpen(&rpcInit); - if (pVnodeConn == NULL) { + pDnodeConn = rpcOpen(&rpcInit); + if (pDnodeConn == NULL) { tscError("failed to init connection to vnode"); return -1; } } - if (*pMgmtConn == NULL) { - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "TSC-mgmt"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = tscProcessMsgFromServer; - rpcInit.ufp = tscUpdateIpSet; - rpcInit.sessions = tsMaxMgmtConnections; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = 2000; - rpcInit.user = (char*)user; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.secret = secretEncrypt; - - *pMgmtConn = rpcOpen(&rpcInit); - if (*pMgmtConn == NULL) { - tscError("failed to init connection to mgmt"); - return -1; - } - } - return 0; } @@ -190,9 +168,9 @@ void taos_cleanup() { taosCloseLog(); - if (pVnodeConn != NULL) { - rpcClose(pVnodeConn); - pVnodeConn = NULL; + if (pDnodeConn != NULL) { + rpcClose(pDnodeConn); + pDnodeConn = NULL; } taosTmrCleanUp(tscTmr); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 506fa1a60560baf5da1e5f401a2ad0798651fcc3..1dbef2afde7dc7c1a2472c02b13e37da90b5c12d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -760,7 +760,6 @@ void tscCloseTscObj(STscObj* pObj) { if (pSql) { sem_destroy(&pSql->rspSem); } - rpcClose(pObj->pMgmtConn); pthread_mutex_destroy(&pObj->mutex); diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 70b55b9d92ddc065bcdc75fbeee4cbb8dd783164..b482ca64f4a7ca56ab0345d7c3de706c3bdbede2 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -1244,7 +1244,6 @@ bool taosCheckGlobalCfg() { tsVersion = 10 * tsVersion; - tsMnodeShellPort = tsServerPort + TSDB_PORT_MNODESHELL; // udp[6030-6034] tcp[6030] tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035] tsMnodeDnodePort = tsServerPort + TSDB_PORT_MNODEDNODE; // udp/tcp tsDnodeMnodePort = tsServerPort + TSDB_PORT_DNODEMNODE; // udp/tcp diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index e1aa48d47786898cb511b55a7913e8ee48d9158c..7d09cf2a0b8d0b4fb86c73dab9825bb6f9a3744a 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -17,6 +17,7 @@ #include "os.h" #include "taosdef.h" #include "tglobal.h" +#include "trpc.h" #include "mnode.h" #include "http.h" #include "monitor.h" diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 0c9716ca52cc48edab7a5bb37b68aa15b2b35dc7..581273d21c3521cf2eddc8b034953eaa58954425 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -34,6 +34,8 @@ static void * tsDnodeShellRpc = NULL; static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; +void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg); + int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; @@ -47,8 +49,8 @@ int32_t dnodeInitShell() { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = tsDnodeShellPort; - rpcInit.label = "DND-shell"; + rpcInit.localPort = tsMnodeShellPort; + rpcInit.label = "SHELL"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; @@ -96,13 +98,11 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); } else { - dError("RPC %p, msg:%s from shell is not handled", pMsg->handle, taosMsg[pMsg->msgType]); - rpcMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; - rpcSendResponse(&rpcMsg); - rpcFreeCont(pMsg->pCont); + mgmtProcessMsgFromShell(pMsg); } } + static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return TSDB_CODE_SUCCESS; } diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 35f7650c20e344f42ba8821fc0abeba50d035fe1..21955e29c124b91aa1d876b446647e78274dcf59 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -26,6 +26,8 @@ void mgmtCleanUpSystem(); void mgmtStopSystem(); void sdbUpdateSync(); +void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg); + #ifdef __cplusplus } #endif diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 1142f02922850e65e1bd85436ef46adb48219cc8..95310ae44a94e5fd2e463779f449de2627aa8170 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -329,8 +329,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_MAX_NORMAL_TABLES 1000 #define TSDB_MAX_CHILD_TABLES 100000 -#define TSDB_PORT_MNODESHELL 0 -#define TSDB_PORT_DNODESHELL 5 +#define TSDB_PORT_DNODESHELL 0 #define TSDB_PORT_DNODEMNODE 10 #define TSDB_PORT_MNODEDNODE 15 #define TSDB_PORT_SYNC 20 diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index b09cb5aff25510b94359f66b61cc7c781189cb2e..a5659fee54934c7cb3de61f3420f831619c412e5 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -41,9 +41,9 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); +//static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); -static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); +//static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); @@ -52,7 +52,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg); void *tsMgmtTmr; -static void *tsMgmtShellRpc = NULL; +//static void *tsMgmtShellRpc = NULL; static void *tsMgmtTranQhandle = NULL; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; static void *tsQhandleCache = NULL; @@ -70,28 +70,6 @@ int32_t mgmtInitShell() { tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT"); tsQhandleCache = taosCacheInit(tsMgmtTmr, 2); - int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; - if (numOfThreads < 1) { - numOfThreads = 1; - } - - SRpcInit rpcInit = {0}; - rpcInit.localPort = tsMnodeShellPort; - rpcInit.label = "MND-shell"; - rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = mgmtProcessMsgFromShell; - rpcInit.sessions = tsMaxShellConns; - rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.afp = mgmtShellRetriveAuth; - - tsMgmtShellRpc = rpcOpen(&rpcInit); - if (tsMgmtShellRpc == NULL) { - mError("failed to init server connection to shell"); - return -1; - } - - mPrint("server connection to shell is opened"); return 0; } @@ -101,12 +79,6 @@ void mgmtCleanUpShell() { tsMgmtTranQhandle = NULL; } - if (tsMgmtShellRpc) { - rpcClose(tsMgmtShellRpc); - tsMgmtShellRpc = NULL; - mPrint("server connection to shell is closed"); - } - if (tsQhandleCache) { taosCacheEmpty(tsQhandleCache); taosCacheCleanup(tsQhandleCache); @@ -148,7 +120,7 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { taosTmrReset(mgmtDoDealyedAddToShellQueue, 1000, queuedMsg, tsMgmtTmr, &unUsed); } -static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { +void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { assert(rpcMsg); if (rpcMsg->pCont == NULL) { @@ -370,6 +342,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); } +/* static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { *spi = 1; *encrypt = 0; @@ -390,6 +363,7 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr return TSDB_CODE_SUCCESS; } } +*/ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};