未验证 提交 eec706d7 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #12075 from taosdata/feature/3.0_mhli

enh: not leader retry
......@@ -42,6 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp);
typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc);
......@@ -52,6 +53,7 @@ typedef struct {
GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp;
SendRspFp sendRspFp;
SendRedirectRspFp sendRedirectRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
ReportStartup reportStartupFp;
......@@ -62,6 +64,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
void tmsgSendRsp(const SRpcMsg* pRsp);
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(void* handle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc);
......
......@@ -89,7 +89,7 @@ typedef struct SSyncFSM {
struct SSyncRaftEntry;
typedef struct SSyncRaftEntry SSyncRaftEntry;
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
// abstract definition of log store in raft
......@@ -149,6 +149,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
ESyncState syncGetMyRole(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid);
typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0,
......
......@@ -34,6 +34,10 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
return (*tsDefaultMsgCb.sendRedirectRspFp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
}
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
}
......
......@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
......@@ -130,10 +130,10 @@ _OVER:
}
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeTrans * pTrans = &pDnode->trans;
SDnodeTrans *pTrans = &pDnode->trans;
tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u;
SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
if (msgType == TDMT_DND_SERVER_STATUS) {
......@@ -320,6 +320,37 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
}
}
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
ASSERT(pRsp->pCont == NULL);
if (pWrapper->procType != DND_PROC_CHILD) {
SRpcMsg resp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
resp.pCont = rpcMallocCont(len);
resp.contLen = len;
tSerializeSMEpSet(resp.pCont, len, &msg);
resp.code = TSDB_CODE_RPC_REDIRECT;
resp.handle = pRsp->handle;
resp.refId = pRsp->refId;
rpcSendResponse(&resp);
} else {
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
}
#if 0
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
if (pWrapper->procType != DND_PROC_CHILD) {
rpcSendRedirectRsp(pRsp->handle, pNewEpSet);
} else {
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
}
#endif
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType != DND_PROC_CHILD) {
rpcRegisterBrokenLinkArg(pMsg);
......@@ -406,6 +437,14 @@ SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) {
return cfg;
}
bool rpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
static int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
......@@ -420,6 +459,7 @@ static int32_t dmInitClient(SDnode *pDnode) {
rpcInit.ckey = INTERNAL_CKEY;
rpcInit.spi = 1;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
......@@ -477,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void * pReq = rpcMallocCont(contLen);
void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......@@ -551,6 +591,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = {
.sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp,
.sendRedirectRspFp = dmSendRedirectRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartupByWrapper,
......
......@@ -149,8 +149,15 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
rsp.code = TSDB_CODE_SYN_NOT_LEADER;
tmsgSendRsp(&rsp);
// rsp.code = TSDB_CODE_SYN_NOT_LEADER;
// tmsgSendRsp(&rsp);
dTrace("syncPropose not leader redirect, vgId:%d ", syncGetVgId(vnodeGetSyncHandle(pVnode->pImpl)));
rsp.code = TSDB_CODE_RPC_REDIRECT;
SEpSet newEpSet;
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp);
......
......@@ -37,7 +37,7 @@ bool syncEnvIsStart() {
int32_t syncEnvStart() {
int32_t ret = 0;
taosSeedRand(taosGetTimestampSec());
//gSyncEnv = doSyncEnvStart(gSyncEnv);
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv = doSyncEnvStart();
assert(gSyncEnv != NULL);
sTrace("syncEnvStart ok!");
......@@ -97,14 +97,14 @@ static SSyncEnv *doSyncEnvStart() {
// start tmr thread
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
atomic_store_8(&(pSyncEnv->isStart), 1);
atomic_store_8(&(pSyncEnv->isStart), 1);
return pSyncEnv;
}
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
assert(pSyncEnv == gSyncEnv);
if (pSyncEnv != NULL) {
atomic_store_8(&(pSyncEnv->isStart), 0);
atomic_store_8(&(pSyncEnv->isStart), 0);
taosTmrCleanUp(pSyncEnv->pTimerManager);
taosMemoryFree(pSyncEnv);
}
......
......@@ -141,6 +141,18 @@ const char* syncGetMyRoleStr(int64_t rid) {
return s;
}
int32_t syncGetVgId(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
assert(rid == pSyncNode->rid);
int32_t vgId = pSyncNode->vgId;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return vgId;
}
SyncTerm syncGetMyTerm(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -153,6 +165,29 @@ SyncTerm syncGetMyTerm(int64_t rid) {
return term;
}
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
memset(pEpSet, 0, sizeof(*pEpSet));
return;
}
assert(rid == pSyncNode->rid);
pEpSet->numOfEps = 0;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
(pEpSet->numOfEps)++;
sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -260,6 +295,8 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
sTrace("syncPropose msgType:%d ", pMsg->msgType);
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -459,18 +496,17 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
syncNodeBecomeLeader(pSyncNode);
syncNodeLog2("==state change become leader immediately==", pSyncNode);
// Raft 3.6.2 Committing entries from previous terms
// use this now
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
return;
}
syncNodeBecomeFollower(pSyncNode);
// for test
......
......@@ -936,8 +936,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
pCtx->epSet = emsg.epSet;
}
addConnToPool(pThrd, pConn);
tTrace("use remote epset, current in use: %d, retry count%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
addConnToPool(pThrd->pool, pConn);
tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
TRANS_RETRY_COUNT_LIMIT);
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册