diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cc11254dcdbb6ff511a487e44405c578c1501ace..650ec13a8c7bace7bdfd1fe53925be4c0d0aa312 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -479,12 +479,8 @@ int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); -typedef struct { - SEpSet epSet; -} SMEpSet; - -int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq); -int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq); +int32_t tSerializeSEpSet(void* buf, int32_t bufLen, const SEpSet* pEpset); +int32_t tDeserializeSEpSet(void* buf, int32_t buflen, SEpSet* pEpset); typedef struct { int8_t connType; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 22ac4124340752415ef320effacc781568b20897..cee5b562414c5cec06e93c9094a949012b56293e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -665,22 +665,24 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) { taosArrayDestroy(pReq->pFields); pReq->pFields = NULL; } -int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) { + +int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) return -1; + if (tEncodeSEpSet(&encoder, pEpset) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; tEncoderClear(&encoder); return tlen; } -int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) { + +int32_t tDeserializeSEpSet(void *buf, int32_t bufLen, SEpSet *pEpset) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) return -1; + if (tDecodeSEpSet(&decoder, pEpset) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c86ace695b34b5d219dd2810f5b346e449fff296..c7509eb9d8a7e1ed47bbc65f8b8e1e2d15364ebc 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -206,29 +206,28 @@ static inline void dmSendRsp(SRpcMsg *pMsg) { } static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { - SMEpSet msg = {0}; - dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &msg.epSet); + SEpSet epSet = {0}; + dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet); - int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg); + int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); pMsg->pCont = rpcMallocCont(contLen); if (pMsg->pCont == NULL) { pMsg->code = TSDB_CODE_OUT_OF_MEMORY; } else { - tSerializeSMEpSet(pMsg->pCont, contLen, &msg); + tSerializeSEpSet(pMsg->pCont, contLen, &epSet); pMsg->contLen = contLen; } } static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; - SMEpSet msg = {.epSet = *pNewEpSet}; - int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg); + int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet); rsp.pCont = rpcMallocCont(contLen); if (rsp.pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; } else { - tSerializeSMEpSet(rsp.pCont, contLen, &msg); + tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet); rsp.contLen = contLen; } dmSendRsp(&rsp); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 03013a96ded9ccb6776bc8cf3ca68455033802ed..23634be77b3ca6f21f31019275353de12ab6c83b 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -20,6 +20,7 @@ #include "mndShow.h" #include "mndTrans.h" #include "mndUser.h" +#include "mndSync.h" #define MNODE_VER_NUMBER 1 #define MNODE_RESERVE_SIZE 64 @@ -222,23 +223,24 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) { } void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { - SSdb *pSdb = pMnode->pSdb; - pEpSet->numOfEps = 0; + SSdb *pSdb = pMnode->pSdb; + int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE); + void *pIter = NULL; - void *pIter = NULL; while (1) { SMnodeObj *pObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); if (pIter == NULL) break; - if (pObj->pDnode == NULL) { - mError("mnode:%d, no corresponding dnode exists", pObj->id); - } else { - if (pObj->id == pMnode->selfDnodeId || pObj->state == TAOS_SYNC_STATE_LEADER) { + + if (pObj->id == pMnode->selfDnodeId) { + if (mndIsMaster(pMnode)) { pEpSet->inUse = pEpSet->numOfEps; + } else { + pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes; } - addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); - sdbRelease(pSdb, pObj); } + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); + sdbRelease(pSdb, pObj); } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 16d836c8170886b6e9349a55201d2cc9d1224732..d5fcf9b1eb9bbc93ba623b09bbe3b8ff459b6240 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -236,6 +236,17 @@ void mndSyncStop(SMnode *pMnode) {} bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + ESyncState state = syncGetMyRole(pMgmt->sync); - return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); + if (state != TAOS_SYNC_STATE_LEADER) { + terrno = TSDB_CODE_SYN_NOT_LEADER; + return false; + } + + if (!pMgmt->restored) { + terrno = TSDB_CODE_APP_NOT_READY; + return false; + } + + return true; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 4e4f69e01d746eb8c249a567b1bf166fc31aa6ba..5458bc5126914a4b33fa0617ca627924199313bd 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -408,46 +408,74 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { return code; } -int32_t mndProcessMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - void *ahandle = pMsg->info.ahandle; - mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle); - - if (IsReq(pMsg)) { - if (!mndIsMaster(pMnode)) { - terrno = TSDB_CODE_APP_NOT_READY; - mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - return -1; - } +static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) { + if (!IsReq(pMsg)) return 0; + if (mndIsMaster(pMsg->info.node)) return 0; - if (pMsg->contLen == 0 || pMsg->pCont == NULL) { - terrno = TSDB_CODE_INVALID_MSG_LEN; - mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - return -1; + if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || + pMsg->msgType == TDMT_MND_TRANS_TIMER) { + return -1; + } + mError("msg:%p, failed to check master since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + TMSG_INFO(pMsg->msgType)); + + SEpSet epSet = {0}; + mndGetMnodeEpSet(pMsg->info.node, &epSet); + +#if 0 + mTrace("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse); + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + mTrace("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); + if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { + epSet.inUse = (i + 1) % epSet.numOfEps; } } +#endif + + int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); + pMsg->info.rsp = rpcMallocCont(contLen); + if (pMsg->info.rsp != NULL) { + tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); + pMsg->info.rspLen = contLen; + terrno = TSDB_CODE_RPC_REDIRECT; + } else { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + + return -1; +} + +static int32_t mndCheckRequestValid(SRpcMsg *pMsg) { + if (!IsReq(pMsg)) return 0; + if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; + + mError("msg:%p, failed to valid request, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_INVALID_MSG_LEN; + return -1; +} + +int32_t mndProcessMsg(SRpcMsg *pMsg) { + if (mndCheckMnodeMaster(pMsg) != 0) return -1; + if (mndCheckRequestValid(pMsg) != 0) return -1; + SMnode *pMnode = pMsg->info.node; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; if (fp == NULL) { + mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED; - mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle); return -1; } + mTrace("msg:%p, will be processed in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); int32_t code = (*fp)(pMsg); if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - terrno = code; - mTrace("msg:%p, in progress, app:%p", pMsg, ahandle); - } else if (code != 0) { - if (terrno != TSDB_CODE_OPS_NOT_SUPPORT) { - mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - } else { - mTrace("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); - } + mTrace("msg:%p, won't response immediately since in progress", pMsg); + } else if (code == 0) { + mTrace("msg:%p, successfully processed and response", pMsg); } else { - mTrace("msg:%p, is processed, app:%p", pMsg, ahandle); + mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + TMSG_INFO(pMsg->msgType)); } - return code; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0313ea583220a72ad94ba5e833385089763551f5..49abc31f56f4dbef76f4c5458d9728a0476b9457 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -948,9 +948,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (pResp->contLen == 0) { pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; } else { - SMEpSet emsg = {0}; - tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); - pCtx->epSet = emsg.epSet; + SEpSet epSet = {0}; + tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); + pCtx->epSet = epSet; } addConnToPool(pThrd->pool, pConn); tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,