提交 9a1959c4 编写于 作者: S Shengliang Guan

fix: return mnode epset while master changed

上级 080c596d
...@@ -479,12 +479,8 @@ int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp); ...@@ -479,12 +479,8 @@ int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
typedef struct { int32_t tSerializeSEpSet(void* buf, int32_t bufLen, const SEpSet* pEpset);
SEpSet epSet; int32_t tDeserializeSEpSet(void* buf, int32_t buflen, SEpSet* pEpset);
} SMEpSet;
int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq);
int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq);
typedef struct { typedef struct {
int8_t connType; int8_t connType;
......
...@@ -665,22 +665,24 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) { ...@@ -665,22 +665,24 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
taosArrayDestroy(pReq->pFields); taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL; pReq->pFields = NULL;
} }
int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) return -1; if (tEncodeSEpSet(&encoder, pEpset) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tEncoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
int32_t tDeserializeSEpSet(void *buf, int32_t bufLen, SEpSet *pEpset) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) return -1; if (tDecodeSEpSet(&decoder, pEpset) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
......
...@@ -206,29 +206,28 @@ static inline void dmSendRsp(SRpcMsg *pMsg) { ...@@ -206,29 +206,28 @@ static inline void dmSendRsp(SRpcMsg *pMsg) {
} }
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SMEpSet msg = {0}; SEpSet epSet = {0};
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &msg.epSet); dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg); int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->pCont = rpcMallocCont(contLen); pMsg->pCont = rpcMallocCont(contLen);
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY; pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
tSerializeSMEpSet(pMsg->pCont, contLen, &msg); tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
pMsg->contLen = contLen; pMsg->contLen = contLen;
} }
} }
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = *pNewEpSet}; int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
rsp.pCont = rpcMallocCont(contLen); rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) { if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
tSerializeSMEpSet(rsp.pCont, contLen, &msg); tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
rsp.contLen = contLen; rsp.contLen = contLen;
} }
dmSendRsp(&rsp); dmSendRsp(&rsp);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndSync.h"
#define MNODE_VER_NUMBER 1 #define MNODE_VER_NUMBER 1
#define MNODE_RESERVE_SIZE 64 #define MNODE_RESERVE_SIZE 64
...@@ -223,23 +224,24 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) { ...@@ -223,23 +224,24 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
pEpSet->numOfEps = 0; int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SMnodeObj *pObj = NULL; SMnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pObj->pDnode == NULL) {
mError("mnode:%d, no corresponding dnode exists", pObj->id); if (pObj->id == pMnode->selfDnodeId) {
} else { if (mndIsMaster(pMnode)) {
if (pObj->id == pMnode->selfDnodeId || pObj->state == TAOS_SYNC_STATE_LEADER) {
pEpSet->inUse = pEpSet->numOfEps; pEpSet->inUse = pEpSet->numOfEps;
} else {
pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
}
} }
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
} }
}
} }
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
......
...@@ -236,6 +236,17 @@ void mndSyncStop(SMnode *pMnode) {} ...@@ -236,6 +236,17 @@ void mndSyncStop(SMnode *pMnode) {}
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
ESyncState state = syncGetMyRole(pMgmt->sync); ESyncState state = syncGetMyRole(pMgmt->sync);
return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); if (state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
return false;
}
if (!pMgmt->restored) {
terrno = TSDB_CODE_APP_NOT_READY;
return false;
}
return true;
} }
...@@ -408,46 +408,74 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -408,46 +408,74 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
return code; return code;
} }
int32_t mndProcessMsg(SRpcMsg *pMsg) { static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; if (!IsReq(pMsg)) return 0;
void *ahandle = pMsg->info.ahandle; if (mndIsMaster(pMsg->info.node)) return 0;
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
if (IsReq(pMsg)) { if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
if (!mndIsMaster(pMnode)) { pMsg->msgType == TDMT_MND_TRANS_TIMER) {
terrno = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
return -1; return -1;
} }
mError("msg:%p, failed to check master since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
if (pMsg->contLen == 0 || pMsg->pCont == NULL) { SEpSet epSet = {0};
terrno = TSDB_CODE_INVALID_MSG_LEN; mndGetMnodeEpSet(pMsg->info.node, &epSet);
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
return -1; #if 0
mTrace("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
mTrace("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
} }
#endif
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.rspLen = contLen;
terrno = TSDB_CODE_RPC_REDIRECT;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} }
return -1;
}
static int32_t mndCheckRequestValid(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
mError("msg:%p, failed to valid request, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1;
}
int32_t mndProcessMsg(SRpcMsg *pMsg) {
if (mndCheckMnodeMaster(pMsg) != 0) return -1;
if (mndCheckRequestValid(pMsg) != 0) return -1;
SMnode *pMnode = pMsg->info.node;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
if (fp == NULL) { if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
return -1; return -1;
} }
mTrace("msg:%p, will be processed in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
int32_t code = (*fp)(pMsg); int32_t code = (*fp)(pMsg);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
terrno = code; mTrace("msg:%p, won't response immediately since in progress", pMsg);
mTrace("msg:%p, in progress, app:%p", pMsg, ahandle); } else if (code == 0) {
} else if (code != 0) { mTrace("msg:%p, successfully processed and response", pMsg);
if (terrno != TSDB_CODE_OPS_NOT_SUPPORT) {
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
} else { } else {
mTrace("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
} }
} else {
mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
}
return code; return code;
} }
......
...@@ -948,9 +948,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -948,9 +948,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pResp->contLen == 0) { if (pResp->contLen == 0) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
} else { } else {
SMEpSet emsg = {0}; SEpSet epSet = {0};
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
pCtx->epSet = emsg.epSet; pCtx->epSet = epSet;
} }
addConnToPool(pThrd->pool, pConn); addConnToPool(pThrd->pool, pConn);
tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1, tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册