提交 feb4bbcd 编写于 作者: J jtao1735

support RpcIpSet change

上级 bd49f058
...@@ -365,7 +365,7 @@ void tscInitMsgsFp(); ...@@ -365,7 +365,7 @@ void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
......
...@@ -221,7 +221,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -221,7 +221,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
if (pSql == NULL) { if (pSql == NULL) {
tscError("%p sql is already released", pSql->signature); tscError("%p sql is already released", pSql->signature);
...@@ -245,6 +245,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -245,6 +245,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
return; return;
} }
if (pCmd->command < TSDB_SQL_MGMT) {
if (pIpSet) pSql->ipList = *pIpSet;
} else {
if (pIpSet) tscMgmtIpSet = *pIpSet;
}
if (rpcMsg->pCont == NULL) { if (rpcMsg->pCont == NULL) {
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL; rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
} else { } else {
......
...@@ -266,8 +266,8 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { ...@@ -266,8 +266,8 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
return taosCfgDynamicOptions(pCfg->config); return taosCfgDynamicOptions(pCfg->config);
} }
void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { void dnodeUpdateIpSet(SRpcIpSet *pIpSet) {
dPrint("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse); dPrint("mnode IP list is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse);
for (int i = 0; i < pIpSet->numOfIps; ++i) { for (int i = 0; i < pIpSet->numOfIps; ++i) {
dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i]) dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i])
} }
......
...@@ -29,11 +29,11 @@ ...@@ -29,11 +29,11 @@
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "mnode.h" #include "mnode.h"
extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); extern void dnodeUpdateIpSet(SRpcIpSet *pIpSet);
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg); static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet);
static void *tsDnodeServerRpc = NULL; static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL; static void *tsDnodeClientRpc = NULL;
...@@ -81,7 +81,7 @@ void dnodeCleanupServer() { ...@@ -81,7 +81,7 @@ void dnodeCleanupServer() {
} }
} }
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SRpcMsg rspMsg; SRpcMsg rspMsg;
rspMsg.handle = pMsg->handle; rspMsg.handle = pMsg->handle;
rspMsg.pCont = NULL; rspMsg.pCont = NULL;
...@@ -119,7 +119,6 @@ int32_t dnodeInitClient() { ...@@ -119,7 +119,6 @@ int32_t dnodeInitClient() {
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromDnode; rpcInit.cfp = dnodeProcessRspFromDnode;
rpcInit.ufp = dnodeUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
...@@ -145,9 +144,10 @@ void dnodeCleanupClient() { ...@@ -145,9 +144,10 @@ void dnodeCleanupClient() {
} }
} }
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) { static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
if (dnodeProcessRspMsgFp[pMsg->msgType]) { if (dnodeProcessRspMsgFp[pMsg->msgType]) {
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pIpSet) dnodeUpdateIpSet(pIpSet);
(*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg);
} else { } else {
dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include "dnodeShell.h" #include "dnodeShell.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void * tsDnodeShellRpc = NULL; static void * tsDnodeShellRpc = NULL;
static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeQueryReqNum = 0;
...@@ -106,7 +106,7 @@ void dnodeCleanupShell() { ...@@ -106,7 +106,7 @@ void dnodeCleanupShell() {
} }
} }
void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.handle = pMsg->handle; rpcMsg.handle = pMsg->handle;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
......
...@@ -66,10 +66,7 @@ typedef struct { ...@@ -66,10 +66,7 @@ typedef struct {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *); void (*cfp)(SRpcMsg *, SRpcIpSet *);
// call back to process notify the ipSet changes, for client app only
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
......
...@@ -55,9 +55,8 @@ typedef struct { ...@@ -55,9 +55,8 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *); void (*cfp)(SRpcMsg *, SRpcIpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
void *tmrCtrl; // handle to timer void *tmrCtrl; // handle to timer
...@@ -222,7 +221,6 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -222,7 +221,6 @@ void *rpcOpen(const SRpcInit *pInit) {
if (pInit->secret) strcpy(pRpc->secret, pInit->secret); if (pInit->secret) strcpy(pRpc->secret, pInit->secret);
if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey); if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey);
pRpc->spi = pInit->spi; pRpc->spi = pInit->spi;
pRpc->ufp = pInit->ufp;
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
...@@ -900,10 +898,11 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -900,10 +898,11 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
} else { } else {
// for asynchronous API // for asynchronous API
if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)) SRpcIpSet *pIpSet = NULL;
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet if (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)
pIpSet = &pContext->ipSet;
(*pRpc->cfp)(pMsg); (*pRpc->cfp)(pMsg, pIpSet);
} }
// free the request message // free the request message
...@@ -924,7 +923,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -924,7 +923,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->cfp))(&rpcMsg); (*(pRpc->cfp))(&rpcMsg, NULL);
} else { } else {
// it's a response // it's a response
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
......
...@@ -31,22 +31,16 @@ typedef struct { ...@@ -31,22 +31,16 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
static void processResponse(SRpcMsg *pMsg) { static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)pMsg->handle; SInfo *pInfo = (SInfo *)pMsg->handle;
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
rpcFreeCont(pMsg->pCont); if (pIpSet) pInfo->ipSet = *pIpSet;
rpcFreeCont(pMsg->pCont);
sem_post(&pInfo->rspSem); sem_post(&pInfo->rspSem);
} }
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet;
}
static int tcount = 0; static int tcount = 0;
static void *sendRequest(void *param) { static void *sendRequest(void *param) {
...@@ -99,7 +93,6 @@ int main(int argc, char *argv[]) { ...@@ -99,7 +93,6 @@ int main(int argc, char *argv[]) {
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse; rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael"; rpcInit.user = "michael";
......
...@@ -32,12 +32,6 @@ typedef struct { ...@@ -32,12 +32,6 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet;
}
static int tcount = 0; static int tcount = 0;
static int terror = 0; static int terror = 0;
...@@ -100,8 +94,6 @@ int main(int argc, char *argv[]) { ...@@ -100,8 +94,6 @@ int main(int argc, char *argv[]) {
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
// rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael"; rpcInit.user = "michael";
......
...@@ -113,7 +113,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char ...@@ -113,7 +113,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
return ret; return ret;
} }
void processRequestMsg(SRpcMsg *pMsg) { void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册