提交 b765f21e 编写于 作者: S Shengliang Guan

refactor: adjust SRpcMsg

上级 7729ce2e
......@@ -42,8 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(const SRpcMsg* pRsp);
typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, SRpcMsg* pRsp);
typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*SendRedirectRspFp)(const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc);
......@@ -56,7 +55,6 @@ typedef struct {
GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp;
SendRspFp sendRspFp;
SendMnodeRecvFp sendMnodeRecvFp;
SendRedirectRspFp sendRedirectRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
......
......@@ -46,21 +46,9 @@ void tmsgSendRsp(SRpcMsg* pRsp) {
}
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
// cannot be empty, but not checked for faster detect
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
if (fp != NULL) {
(*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
}
void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp) {
SendMnodeRecvFp fp = tsDefaultMsgCb.sendMnodeRecvFp;
if (fp != NULL) {
(*fp)(tsDefaultMsgCb.pWrapper, pReq, pRsp);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
(*fp)(pRsp, pNewEpSet);
}
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
......
......@@ -87,7 +87,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcRsp = {0};
dTrace("send status msg to mnode, app:%p", rpcMsg.info.ahandle);
tmsgSendMnodeRecv(&rpcMsg, &rpcRsp);
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp);
dmProcessStatusRsp(pMgmt, &rpcRsp);
}
......
......@@ -21,26 +21,6 @@
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SDnodeData *pData = &pDnode->data;
taosRLockLatch(&pData->latch);
*pEpSet = pData->mnodeEps;
taosRUnLockLatch(&pData->latch);
}
static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SDnodeData *pData = &pDnode->data;
taosWLockLatch(&pData->latch);
pData->mnodeEps = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pData->latch);
}
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) {
......@@ -214,7 +194,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
......@@ -257,12 +237,6 @@ static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRp
}
}
static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0};
dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
}
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode;
if (pDnode->status != DND_STAT_RUNNING || pDnode->trans.clientRpc == NULL) {
......@@ -286,7 +260,8 @@ static inline void dmSendRsp(const SRpcMsg *pRsp) {
}
}
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
} else {
......@@ -407,7 +382,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SRpcMsg rpcRsp = {0};
SEpSet epSet = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dmGetMnodeEpSet(pDnode, &epSet);
dmGetMnodeEpSet(&pDnode->data, &epSet);
dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
......@@ -469,7 +444,6 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.clientRpc = pWrapper->pDnode->trans.clientRpc,
.sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp,
.sendMnodeRecvFp = dmSendToMnodeRecv,
.sendRedirectRspFp = dmSendRedirectRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,
......
......@@ -179,6 +179,8 @@ int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm);
int32_t dmReadEps(SDnodeData *pData);
int32_t dmWriteEps(SDnodeData *pData);
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
#ifdef __cplusplus
}
......
......@@ -307,3 +307,21 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
taosRUnLockLatch(&pData->latch);
return changed;
}
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
taosRLockLatch(&pData->latch);
*pEpSet = pData->mnodeEps;
taosRUnLockLatch(&pData->latch);
}
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
taosWLockLatch(&pData->latch);
pData->mnodeEps = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pData->latch);
}
......@@ -440,9 +440,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d msglen:%d", pConn, TMSG_INFO(pHead->msgType),
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port));
ntohs(pConn->locaddr.sin_port), len);
pHead->msgLen = htonl(len);
wb->base = msg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册