未验证 提交 675fe706 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13100 from taosdata/fix/mnode

fix: return mnode epset while master changed
......@@ -479,12 +479,8 @@ int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
typedef struct {
SEpSet epSet;
} SMEpSet;
int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq);
int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq);
int32_t tSerializeSEpSet(void* buf, int32_t bufLen, const SEpSet* pEpset);
int32_t tDeserializeSEpSet(void* buf, int32_t buflen, SEpSet* pEpset);
typedef struct {
int8_t connType;
......
......@@ -665,22 +665,24 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL;
}
int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) return -1;
if (tEncodeSEpSet(&encoder, pEpset) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
int32_t tDeserializeSEpSet(void *buf, int32_t bufLen, SEpSet *pEpset) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) return -1;
if (tDecodeSEpSet(&decoder, pEpset) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -206,29 +206,28 @@ static inline void dmSendRsp(SRpcMsg *pMsg) {
}
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SMEpSet msg = {0};
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &msg.epSet);
SEpSet epSet = {0};
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->pCont = rpcMallocCont(contLen);
if (pMsg->pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSMEpSet(pMsg->pCont, contLen, &msg);
tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
pMsg->contLen = contLen;
}
}
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
......
......@@ -20,6 +20,7 @@
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndSync.h"
#define MNODE_VER_NUMBER 1
#define MNODE_RESERVE_SIZE 64
......@@ -222,23 +223,24 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
}
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
SSdb *pSdb = pMnode->pSdb;
pEpSet->numOfEps = 0;
SSdb *pSdb = pMnode->pSdb;
int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
void *pIter = NULL;
void *pIter = NULL;
while (1) {
SMnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
if (pObj->pDnode == NULL) {
mError("mnode:%d, no corresponding dnode exists", pObj->id);
} else {
if (pObj->id == pMnode->selfDnodeId || pObj->state == TAOS_SYNC_STATE_LEADER) {
if (pObj->id == pMnode->selfDnodeId) {
if (mndIsMaster(pMnode)) {
pEpSet->inUse = pEpSet->numOfEps;
} else {
pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
}
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj);
}
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj);
}
}
......
......@@ -236,6 +236,17 @@ void mndSyncStop(SMnode *pMnode) {}
bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
ESyncState state = syncGetMyRole(pMgmt->sync);
return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
if (state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
return false;
}
if (!pMgmt->restored) {
terrno = TSDB_CODE_APP_NOT_READY;
return false;
}
return true;
}
......@@ -408,46 +408,74 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
return code;
}
int32_t mndProcessMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
void *ahandle = pMsg->info.ahandle;
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
if (IsReq(pMsg)) {
if (!mndIsMaster(pMnode)) {
terrno = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
return -1;
}
static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (mndIsMaster(pMsg->info.node)) return 0;
if (pMsg->contLen == 0 || pMsg->pCont == NULL) {
terrno = TSDB_CODE_INVALID_MSG_LEN;
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
return -1;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER) {
return -1;
}
mError("msg:%p, failed to check master since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet);
#if 0
mTrace("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
mTrace("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
}
#endif
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.rspLen = contLen;
terrno = TSDB_CODE_RPC_REDIRECT;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
return -1;
}
static int32_t mndCheckRequestValid(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
mError("msg:%p, failed to valid request, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1;
}
int32_t mndProcessMsg(SRpcMsg *pMsg) {
if (mndCheckMnodeMaster(pMsg) != 0) return -1;
if (mndCheckRequestValid(pMsg) != 0) return -1;
SMnode *pMnode = pMsg->info.node;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
return -1;
}
mTrace("msg:%p, will be processed in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
int32_t code = (*fp)(pMsg);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
terrno = code;
mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
} else if (code != 0) {
if (terrno != TSDB_CODE_OPS_NOT_SUPPORT) {
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
} else {
mTrace("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
}
mTrace("msg:%p, won't response immediately since in progress", pMsg);
} else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg);
} else {
mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
return code;
}
......
......@@ -975,9 +975,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pResp->contLen == 0) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
} else {
SMEpSet emsg = {0};
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
pCtx->epSet = emsg.epSet;
SEpSet epSet = {0};
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
pCtx->epSet = epSet;
}
addConnToPool(pThrd->pool, pConn);
tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册