diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 9f77180cd890f73c514c22be813c538c8564e513..eec6bb3fb4e2c768bc63500d092874150ea72287 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -126,7 +126,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; tmsgSendRedirectRsp(&rsp, &newEpSet); - } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; tmsgSendRsp(&rsp); diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index d486707c5eee16a77f2e78ee1f804fd0cebe42b6..f162069be994f400bf01880640c19aa6bc656e89 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -103,7 +103,7 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg return -1; } - if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) { + if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0 && pMsg->info.noResp != 0) { if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) { taosThreadMutexUnlock(&queue->mutex); return -1; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index ee9008b1982cb57f858b184d099badcb61c38ef4..4cf0e342a5acaacd2d1c0a5d90dc06f39ef24af3 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -200,47 +200,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { return 0; } -static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) { - SDnode *pDnode = dmInstance(); - SEpSet epSet = {0}; - dmGetMnodeEpSet(&pDnode->data, &epSet); - - dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse); - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { - epSet.inUse = (i + 1) % epSet.numOfEps; - } - - epSet.eps[i].port = htons(epSet.eps[i].port); - } - - SMEpSet msg = {.epSet = epSet}; - int32_t len = tSerializeSMEpSet(NULL, 0, &msg); - - SRpcMsg rsp = { - .code = TSDB_CODE_RPC_REDIRECT, - .info = pMsg->info, - .contLen = len, - }; - rsp.pCont = rpcMallocCont(len); - tSerializeSMEpSet(rsp.pCont, len, &msg); - rpcSendResponse(&rsp); - - rpcFreeCont(pMsg->pCont); -} - -static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { - SDnode *pDnode = dmInstance(); - if (pDnode->status != DND_STAT_RUNNING) { - pRsp->code = TSDB_CODE_NODE_OFFLINE; - rpcFreeCont(pReq->pCont); - pReq->pCont = NULL; - } else { - rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); - } -} - static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); if (pDnode->status != DND_STAT_RUNNING) { @@ -257,39 +216,38 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { static inline void dmSendRsp(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (pMsg->code == TSDB_CODE_NODE_REDIRECT) { - dmSendRpcRedirectRsp(pMsg); + if (InChildProc(pWrapper)) { + dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; } else { - if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); - } else { - rpcSendResponse(pMsg); - } + rpcSendResponse(pMsg); } } static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { - SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; + SMEpSet msg = {.epSet = *pNewEpSet}; + int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg); + + rsp.pCont = rpcMallocCont(contLen); + if (rsp.pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; } else { - SRpcMsg rsp = {0}; - SMEpSet msg = {.epSet = *pNewEpSet}; - int32_t len = tSerializeSMEpSet(NULL, 0, &msg); - rsp.pCont = rpcMallocCont(len); - rsp.contLen = len; - tSerializeSMEpSet(rsp.pCont, len, &msg); - - rsp.code = TSDB_CODE_RPC_REDIRECT; - rsp.info = pMsg->info; - rpcSendResponse(&rsp); + tSerializeSMEpSet(rsp.pCont, contLen, &msg); + rsp.contLen = contLen; } + dmSendRsp(&rsp); + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; } static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; if (InChildProc(pWrapper)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST); + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; } else { rpcRegisterBrokenLinkArg(pMsg); } @@ -391,3 +349,34 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) { }; return msgCb; } + +static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) { + SDnode *pDnode = dmInstance(); + SEpSet epSet = {0}; + dmGetMnodeEpSet(&pDnode->data, &epSet); + + dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse); + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); + if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { + epSet.inUse = (i + 1) % epSet.numOfEps; + } + + epSet.eps[i].port = htons(epSet.eps[i].port); + } + + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; + SMEpSet msg = {.epSet = epSet}; + int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg); + rsp.pCont = rpcMallocCont(contLen); + if (rsp.pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } else { + tSerializeSMEpSet(rsp.pCont, contLen, &msg); + rsp.contLen = contLen; + } + + dmSendRsp(&rsp); + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; +} diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index d502d952f31671679728c62180fddb8b29c40488..60270d3e065ec7e1f8fb9878293ac2c948951dea 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -287,7 +287,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { req->taskId = htobe64(tId); req->refId = htobe64(rId); - SRpcMsg pMsg = { + SRpcMsg brokenMsg = { .msgType = TDMT_VND_DROP_TASK, .pCont = req, .contLen = sizeof(STaskDropReq), @@ -295,7 +295,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { .info = *pConn, }; - tmsgRegisterBrokenLinkArg(&pMsg); + tmsgRegisterBrokenLinkArg(&brokenMsg); return TSDB_CODE_SUCCESS; } @@ -321,7 +321,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SRpcMsg pMsg = { + SRpcMsg brokenMsg = { .msgType = TDMT_VND_QUERY_HEARTBEAT, .pCont = msg, .contLen = msgSize, @@ -329,7 +329,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * .info = *pConn, }; - tmsgRegisterBrokenLinkArg(&pMsg); + tmsgRegisterBrokenLinkArg(&brokenMsg); return TSDB_CODE_SUCCESS; }