diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 61a5fdd311fa613e01ef78b8e36eac641c77f253..6ea1ee6440999d843f2326ffc2567b9ca79957f9 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -365,7 +365,7 @@ void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); -void tscProcessMsgFromServer(SRpcMsg *rpcMsg); +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet); int tscProcessSql(SSqlObj *pSql); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8efe89d28ab497ea7a442b412845d9b54172dce6..98cbe9dbdeff69e8de996b23069158b76328dac2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -221,7 +221,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL) { tscError("%p sql is already released", pSql->signature); @@ -245,6 +245,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { return; } + if (pCmd->command < TSDB_SQL_MGMT) { + if (pIpSet) pSql->ipList = *pIpSet; + } else { + if (pIpSet) tscMgmtIpSet = *pIpSet; + } + if (rpcMsg->pCont == NULL) { rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL; } else { diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0e91cc7155daeb8057a560d80f78582cca584f85..36a7c9880735f8186368ff66c921ad499b31bc73 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -266,8 +266,8 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { return taosCfgDynamicOptions(pCfg->config); } -void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { - dPrint("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); +void dnodeUpdateIpSet(SRpcIpSet *pIpSet) { + dPrint("mnode IP list is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); for (int i = 0; i < pIpSet->numOfIps; ++i) { dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) } diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index ea21ed02061ce5aaf52222914b465d7d338229e0..51913d80c4531fabb4c525098b2d53fc0176a3f2 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -29,11 +29,11 @@ #include "dnodeVWrite.h" #include "mnode.h" -extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); +extern void dnodeUpdateIpSet(SRpcIpSet *pIpSet); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet); static void *tsDnodeServerRpc = NULL; static void *tsDnodeClientRpc = NULL; @@ -81,7 +81,7 @@ void dnodeCleanupServer() { } } -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SRpcMsg rspMsg; rspMsg.handle = pMsg->handle; rspMsg.pCont = NULL; @@ -119,7 +119,6 @@ int32_t dnodeInitClient() { rpcInit.label = "DND-C"; rpcInit.numOfThreads = 1; rpcInit.cfp = dnodeProcessRspFromDnode; - rpcInit.ufp = dnodeUpdateIpSet; rpcInit.sessions = 100; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; @@ -145,9 +144,10 @@ void dnodeCleanupClient() { } } -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) { +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { if (dnodeProcessRspMsgFp[pMsg->msgType]) { + if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) dnodeUpdateIpSet(pIpSet); (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); } else { dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 28679262faa7d38eebf8c49317b5df272ae2625e..dc0efd405f5b89056b16bec090b4fdd81c31e710 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -28,7 +28,7 @@ #include "dnodeShell.h" static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessMsgFromShell(SRpcMsg *pMsg); +static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static void * tsDnodeShellRpc = NULL; static int32_t tsDnodeQueryReqNum = 0; @@ -106,7 +106,7 @@ void dnodeCleanupShell() { } } -void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { +void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SRpcMsg rpcMsg; rpcMsg.handle = pMsg->handle; rpcMsg.pCont = NULL; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 8b082b65b8ad5c1e951e216c9b9192bf300ffa40..eff210433f7d7bcc2f4a5ad1d12bd88ed59581be 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -66,10 +66,7 @@ typedef struct { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(SRpcMsg *); - - // call back to process notify the ipSet changes, for client app only - void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); + void (*cfp)(SRpcMsg *, SRpcIpSet *); // call back to retrieve the client auth info, for server app only int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a2333566f194515efcc90980396262b23cf584a6..ca4b211be880e361f5433b932afbfa8553635abd 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -55,9 +55,8 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void (*cfp)(SRpcMsg *); + void (*cfp)(SRpcMsg *, SRpcIpSet *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); - void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); void *idPool; // handle to ID pool void *tmrCtrl; // handle to timer @@ -222,7 +221,6 @@ void *rpcOpen(const SRpcInit *pInit) { if (pInit->secret) strcpy(pRpc->secret, pInit->secret); if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey); pRpc->spi = pInit->spi; - pRpc->ufp = pInit->ufp; pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; @@ -900,10 +898,11 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); } else { // for asynchronous API - if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)) - (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet + SRpcIpSet *pIpSet = NULL; + if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) + pIpSet = &pContext->ipSet; - (*pRpc->cfp)(pMsg); + (*pRpc->cfp)(pMsg, pIpSet); } // free the request message @@ -924,7 +923,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); - (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response SRpcReqContext *pContext = pConn->pContext; diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 2aa1f0e4e94c0b944b7ac0553c1188fd482c7111..ea1ebb5974691f9bd6a1244e6ad06de464d2b307 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -31,22 +31,16 @@ typedef struct { void *pRpc; } SInfo; -static void processResponse(SRpcMsg *pMsg) { +static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SInfo *pInfo = (SInfo *)pMsg->handle; tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); - rpcFreeCont(pMsg->pCont); + if (pIpSet) pInfo->ipSet = *pIpSet; + rpcFreeCont(pMsg->pCont); sem_post(&pInfo->rspSem); } -static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { - SInfo *pInfo = (SInfo *)handle; - - tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); - pInfo->ipSet = *pIpSet; -} - static int tcount = 0; static void *sendRequest(void *param) { @@ -99,7 +93,6 @@ int main(int argc, char *argv[]) { rpcInit.label = "APP"; rpcInit.numOfThreads = 1; rpcInit.cfp = processResponse; - rpcInit.ufp = processUpdateIpSet; rpcInit.sessions = 100; rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.user = "michael"; diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c index 683cbb590a5d198cd9d1c220c1a4fd6b544aa1fc..3b19d7a9ea5561e3641fbbbf2deae99c5794df87 100644 --- a/src/rpc/test/rsclient.c +++ b/src/rpc/test/rsclient.c @@ -32,12 +32,6 @@ typedef struct { void *pRpc; } SInfo; -static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { - SInfo *pInfo = (SInfo *)handle; - - tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); - pInfo->ipSet = *pIpSet; -} static int tcount = 0; static int terror = 0; @@ -100,8 +94,6 @@ int main(int argc, char *argv[]) { rpcInit.localPort = 0; rpcInit.label = "APP"; rpcInit.numOfThreads = 1; - // rpcInit.cfp = processResponse; - rpcInit.ufp = processUpdateIpSet; rpcInit.sessions = 100; rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.user = "michael"; diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 9f781ef276dc2788ae5bf7470431465649d7e542..958d099027f2072b82aee45fe302f0042c1fd8aa 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -113,7 +113,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char return ret; } -void processRequestMsg(SRpcMsg *pMsg) { +void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { SRpcMsg *pTemp; pTemp = taosAllocateQitem(sizeof(SRpcMsg));