未验证 提交 040817a2 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12716 from taosdata/fix/dnode

refactor: redirect msg
...@@ -126,7 +126,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -126,7 +126,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
tmsgSendRedirectRsp(&rsp, &newEpSet); tmsgSendRedirectRsp(&rsp, &newEpSet);
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
......
...@@ -103,7 +103,7 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg ...@@ -103,7 +103,7 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg
return -1; return -1;
} }
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) { if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0 && pMsg->info.noResp == 0) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) { if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
return -1; return -1;
......
...@@ -200,47 +200,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -200,47 +200,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0; return 0;
} }
static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SMEpSet msg = {.epSet = epSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
SRpcMsg rsp = {
.code = TSDB_CODE_RPC_REDIRECT,
.info = pMsg->info,
.contLen = len,
};
rsp.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rsp.pCont, len, &msg);
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
}
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) {
pRsp->code = TSDB_CODE_NODE_OFFLINE;
rpcFreeCont(pReq->pCont);
pReq->pCont = NULL;
} else {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
}
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) { if (pDnode->status != DND_STAT_RUNNING) {
...@@ -257,39 +216,38 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -257,39 +216,38 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
static inline void dmSendRsp(SRpcMsg *pMsg) { static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) { if (InChildProc(pWrapper)) {
dmSendRpcRedirectRsp(pMsg); dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else { } else {
if (InChildProc(pWrapper)) { rpcSendResponse(pMsg);
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
rpcSendResponse(pMsg);
}
} }
} }
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
if (InChildProc(pWrapper)) { SMEpSet msg = {.epSet = *pNewEpSet};
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
SRpcMsg rsp = {0}; tSerializeSMEpSet(rsp.pCont, contLen, &msg);
SMEpSet msg = {.epSet = *pNewEpSet}; rsp.contLen = contLen;
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(len);
rsp.contLen = len;
tSerializeSMEpSet(rsp.pCont, len, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.info = pMsg->info;
rpcSendResponse(&rsp);
} }
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} }
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper; SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) { if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST); dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else { } else {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
} }
...@@ -391,3 +349,34 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) { ...@@ -391,3 +349,34 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
}; };
return msgCb; return msgCb;
} }
static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = epSet};
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
...@@ -287,7 +287,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { ...@@ -287,7 +287,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->taskId = htobe64(tId); req->taskId = htobe64(tId);
req->refId = htobe64(rId); req->refId = htobe64(rId);
SRpcMsg pMsg = { SRpcMsg brokenMsg = {
.msgType = TDMT_VND_DROP_TASK, .msgType = TDMT_VND_DROP_TASK,
.pCont = req, .pCont = req,
.contLen = sizeof(STaskDropReq), .contLen = sizeof(STaskDropReq),
...@@ -295,7 +295,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { ...@@ -295,7 +295,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
.info = *pConn, .info = *pConn,
}; };
tmsgRegisterBrokenLinkArg(&pMsg); tmsgRegisterBrokenLinkArg(&brokenMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -321,7 +321,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * ...@@ -321,7 +321,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SRpcMsg pMsg = { SRpcMsg brokenMsg = {
.msgType = TDMT_VND_QUERY_HEARTBEAT, .msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg, .pCont = msg,
.contLen = msgSize, .contLen = msgSize,
...@@ -329,7 +329,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * ...@@ -329,7 +329,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
.info = *pConn, .info = *pConn,
}; };
tmsgRegisterBrokenLinkArg(&pMsg); tmsgRegisterBrokenLinkArg(&brokenMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册