From feb4bbcde09624343cbed5e8cc58d89c5ffc4e33 Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Mon, 11 May 2020 13:10:56 +0000 Subject: [PATCH] support RpcIpSet change --- src/client/inc/tsclient.h | 2 +- src/client/src/tscServer.c | 8 +++++++- src/dnode/src/dnodeMgmt.c | 4 ++-- src/dnode/src/dnodePeer.c | 12 ++++++------ src/dnode/src/dnodeShell.c | 4 ++-- src/inc/trpc.h | 5 +---- src/rpc/src/rpcMain.c | 13 ++++++------- src/rpc/test/rclient.c | 13 +++---------- src/rpc/test/rsclient.c | 8 -------- src/rpc/test/rserver.c | 2 +- 10 files changed, 29 insertions(+), 42 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 61a5fdd311..6ea1ee6440 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -365,7 +365,7 @@ void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); -void tscProcessMsgFromServer(SRpcMsg *rpcMsg); +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet); int tscProcessSql(SSqlObj *pSql); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8efe89d28a..98cbe9dbde 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -221,7 +221,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL) { tscError("%p sql is already released", pSql->signature); @@ -245,6 +245,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { return; } + if (pCmd->command < TSDB_SQL_MGMT) { + if (pIpSet) pSql->ipList = *pIpSet; + } else { + if (pIpSet) tscMgmtIpSet = *pIpSet; + } + if (rpcMsg->pCont == NULL) { rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL; } else { diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0e91cc7155..36a7c98807 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -266,8 +266,8 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { return taosCfgDynamicOptions(pCfg->config); } -void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { - dPrint("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); +void dnodeUpdateIpSet(SRpcIpSet *pIpSet) { + dPrint("mnode IP list is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); for (int i = 0; i < pIpSet->numOfIps; ++i) { dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) } diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index ea21ed0206..51913d80c4 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -29,11 +29,11 @@ #include "dnodeVWrite.h" #include "mnode.h" -extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); +extern void dnodeUpdateIpSet(SRpcIpSet *pIpSet); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet); static void *tsDnodeServerRpc = NULL; static void *tsDnodeClientRpc = NULL; @@ -81,7 +81,7 @@ void dnodeCleanupServer() { } } -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SRpcMsg rspMsg; rspMsg.handle = pMsg->handle; rspMsg.pCont = NULL; @@ -119,7 +119,6 @@ int32_t dnodeInitClient() { rpcInit.label = "DND-C"; rpcInit.numOfThreads = 1; rpcInit.cfp = dnodeProcessRspFromDnode; - rpcInit.ufp = dnodeUpdateIpSet; rpcInit.sessions = 100; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; @@ -145,9 +144,10 @@ void dnodeCleanupClient() { } } -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) { +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { if (dnodeProcessRspMsgFp[pMsg->msgType]) { + if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) dnodeUpdateIpSet(pIpSet); (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); } else { dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 28679262fa..dc0efd405f 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -28,7 +28,7 @@ #include "dnodeShell.h" static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessMsgFromShell(SRpcMsg *pMsg); +static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static void * tsDnodeShellRpc = NULL; static int32_t tsDnodeQueryReqNum = 0; @@ -106,7 +106,7 @@ void dnodeCleanupShell() { } } -void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { +void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SRpcMsg rpcMsg; rpcMsg.handle = pMsg->handle; rpcMsg.pCont = NULL; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 8b082b65b8..eff210433f 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -66,10 +66,7 @@ typedef struct { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(SRpcMsg *); - - // call back to process notify the ipSet changes, for client app only - void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); + void (*cfp)(SRpcMsg *, SRpcIpSet *); // call back to retrieve the client auth info, for server app only int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a2333566f1..ca4b211be8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -55,9 +55,8 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void (*cfp)(SRpcMsg *); + void (*cfp)(SRpcMsg *, SRpcIpSet *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); - void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); void *idPool; // handle to ID pool void *tmrCtrl; // handle to timer @@ -222,7 +221,6 @@ void *rpcOpen(const SRpcInit *pInit) { if (pInit->secret) strcpy(pRpc->secret, pInit->secret); if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey); pRpc->spi = pInit->spi; - pRpc->ufp = pInit->ufp; pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; @@ -900,10 +898,11 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); } else { // for asynchronous API - if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)) - (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet + SRpcIpSet *pIpSet = NULL; + if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) + pIpSet = &pContext->ipSet; - (*pRpc->cfp)(pMsg); + (*pRpc->cfp)(pMsg, pIpSet); } // free the request message @@ -924,7 +923,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); - (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response SRpcReqContext *pContext = pConn->pContext; diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 2aa1f0e4e9..ea1ebb5974 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -31,22 +31,16 @@ typedef struct { void *pRpc; } SInfo; -static void processResponse(SRpcMsg *pMsg) { +static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SInfo *pInfo = (SInfo *)pMsg->handle; tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); - rpcFreeCont(pMsg->pCont); + if (pIpSet) pInfo->ipSet = *pIpSet; + rpcFreeCont(pMsg->pCont); sem_post(&pInfo->rspSem); } -static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { - SInfo *pInfo = (SInfo *)handle; - - tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); - pInfo->ipSet = *pIpSet; -} - static int tcount = 0; static void *sendRequest(void *param) { @@ -99,7 +93,6 @@ int main(int argc, char *argv[]) { rpcInit.label = "APP"; rpcInit.numOfThreads = 1; rpcInit.cfp = processResponse; - rpcInit.ufp = processUpdateIpSet; rpcInit.sessions = 100; rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.user = "michael"; diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c index 683cbb590a..3b19d7a9ea 100644 --- a/src/rpc/test/rsclient.c +++ b/src/rpc/test/rsclient.c @@ -32,12 +32,6 @@ typedef struct { void *pRpc; } SInfo; -static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { - SInfo *pInfo = (SInfo *)handle; - - tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); - pInfo->ipSet = *pIpSet; -} static int tcount = 0; static int terror = 0; @@ -100,8 +94,6 @@ int main(int argc, char *argv[]) { rpcInit.localPort = 0; rpcInit.label = "APP"; rpcInit.numOfThreads = 1; - // rpcInit.cfp = processResponse; - rpcInit.ufp = processUpdateIpSet; rpcInit.sessions = 100; rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.user = "michael"; diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 9f781ef276..958d099027 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -113,7 +113,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char return ret; } -void processRequestMsg(SRpcMsg *pMsg) { +void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SRpcMsg *pTemp; pTemp = taosAllocateQitem(sizeof(SRpcMsg)); -- GitLab