提交 63aa7bac 编写于 作者: S Shengliang Guan

refactor: redirect msg

上级 0161b645
......@@ -126,7 +126,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp);
......
......@@ -103,7 +103,7 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg
return -1;
}
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) {
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0 && pMsg->info.noResp != 0) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
taosThreadMutexUnlock(&queue->mutex);
return -1;
......
......@@ -200,47 +200,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0;
}
static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SMEpSet msg = {.epSet = epSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
SRpcMsg rsp = {
.code = TSDB_CODE_RPC_REDIRECT,
.info = pMsg->info,
.contLen = len,
};
rsp.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rsp.pCont, len, &msg);
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
}
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) {
pRsp->code = TSDB_CODE_NODE_OFFLINE;
rpcFreeCont(pReq->pCont);
pReq->pCont = NULL;
} else {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
}
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) {
......@@ -257,39 +216,38 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else {
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
rpcSendResponse(pMsg);
}
rpcSendResponse(pMsg);
}
}
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
SRpcMsg rsp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(len);
rsp.contLen = len;
tSerializeSMEpSet(rsp.pCont, len, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.info = pMsg->info;
rpcSendResponse(&rsp);
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
} else {
rpcRegisterBrokenLinkArg(pMsg);
}
......@@ -391,3 +349,34 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
};
return msgCb;
}
static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = epSet};
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
......@@ -287,7 +287,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->taskId = htobe64(tId);
req->refId = htobe64(rId);
SRpcMsg pMsg = {
SRpcMsg brokenMsg = {
.msgType = TDMT_VND_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
......@@ -295,7 +295,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
.info = *pConn,
};
tmsgRegisterBrokenLinkArg(&pMsg);
tmsgRegisterBrokenLinkArg(&brokenMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -321,7 +321,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SRpcMsg pMsg = {
SRpcMsg brokenMsg = {
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = msgSize,
......@@ -329,7 +329,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
.info = *pConn,
};
tmsgRegisterBrokenLinkArg(&pMsg);
tmsgRegisterBrokenLinkArg(&brokenMsg);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册