提交 d137ce1c 编写于 作者: S Shengliang Guan

refactor: create mnode handle

上级 28f6dcad
...@@ -1273,7 +1273,6 @@ int32_t tSerializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnod ...@@ -1273,7 +1273,6 @@ int32_t tSerializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnod
int32_t tDeserializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq); int32_t tDeserializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
typedef struct { typedef struct {
int32_t dnodeId;
int8_t replica; int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
} SDCreateMnodeReq, SDAlterMnodeReq; } SDCreateMnodeReq, SDAlterMnodeReq;
......
...@@ -3186,7 +3186,6 @@ int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq * ...@@ -3186,7 +3186,6 @@ int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
...@@ -3204,7 +3203,6 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq ...@@ -3204,7 +3203,6 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
......
...@@ -79,7 +79,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -79,7 +79,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (createReq.replica <= 1 || (createReq.dnodeId != pInput->pData->dnodeId && pInput->pData->dnodeId != 0)) { if (createReq.replica != 1) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
......
...@@ -3,7 +3,7 @@ if(${BUILD_TEST}) ...@@ -3,7 +3,7 @@ if(${BUILD_TEST})
add_subdirectory(qnode) add_subdirectory(qnode)
add_subdirectory(bnode) add_subdirectory(bnode)
add_subdirectory(snode) add_subdirectory(snode)
add_subdirectory(mnode) #add_subdirectory(mnode)
add_subdirectory(vnode) add_subdirectory(vnode)
add_subdirectory(sut) add_subdirectory(sut)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -267,75 +267,83 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod ...@@ -267,75 +267,83 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod
} }
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
int32_t numOfReplicas = 0; int32_t numOfReplicas = 0;
SDAlterMnodeReq alterReq = {0};
SDCreateMnodeReq createReq = {0}; SDCreateMnodeReq createReq = {0};
SEpSet alterEpset = {0};
SEpSet createEpset = {0};
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
SReplica *pReplica = &createReq.replicas[numOfReplicas]; alterReq.replicas[numOfReplicas].id = pMObj->id;
pReplica->id = pMObj->id; alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
pReplica->port = pMObj->pDnode->port; memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++;
alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
alterEpset.inUse = numOfReplicas;
}
numOfReplicas++;
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
} }
SReplica *pReplica = &createReq.replicas[numOfReplicas]; alterReq.replica = numOfReplicas + 1;
pReplica->id = pDnode->id; alterReq.replicas[numOfReplicas].id = pDnode->id;
pReplica->port = pDnode->port; alterReq.replicas[numOfReplicas].port = pDnode->port;
memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(alterReq.replicas[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++;
createReq.replica = numOfReplicas; alterEpset.numOfEps = numOfReplicas + 1;
alterEpset.eps[numOfReplicas].port = pDnode->port;
memcpy(alterEpset.eps[numOfReplicas].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
while (1) { createReq.replica = 1;
SMnodeObj *pMObj = NULL; createReq.replicas[0].id = pDnode->id;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); createReq.replicas[0].port = pDnode->port;
if (pIter == NULL) break; memcpy(createReq.replicas[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
STransAction action = {0}; createEpset.numOfEps = 1;
createEpset.eps[0].port = pDnode->port;
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
createReq.dnodeId = pMObj->id; {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &createReq); tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
action.epSet = mndGetDnodeEpset(pMObj->pDnode); STransAction action = {
action.pCont = pReq; .epSet = alterEpset,
action.contLen = contLen; .pCont = pReq,
action.msgType = TDMT_DND_ALTER_MNODE; .contLen = contLen,
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; .msgType = TDMT_DND_ALTER_MNODE,
.acceptableCode = 0,
};
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj);
return -1; return -1;
} }
sdbRelease(pSdb, pMObj);
} }
{ {
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
createReq.dnodeId = pObj->id;
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &createReq); tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
action.epSet = mndGetDnodeEpset(pDnode); STransAction action = {
action.pCont = pReq; .epSet = createEpset,
action.contLen = contLen; .pCont = pReq,
action.msgType = TDMT_DND_CREATE_MNODE; .contLen = contLen,
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED; .msgType = TDMT_DND_CREATE_MNODE,
.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
};
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
...@@ -441,73 +449,77 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO ...@@ -441,73 +449,77 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
} }
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
int32_t numOfReplicas = 0; int32_t numOfReplicas = 0;
SDAlterMnodeReq alterReq = {0}; SDAlterMnodeReq alterReq = {0};
SDDropMnodeReq dropReq = {0};
SEpSet alterEpset = {0};
SEpSet dropEpSet = {0};
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pMObj->id == pObj->id) {
sdbRelease(pSdb, pMObj);
continue;
}
alterReq.replicas[numOfReplicas].id = pMObj->id;
alterReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
memcpy(alterReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
if (pMObj->id != pObj->id) { alterEpset.eps[numOfReplicas].port = pMObj->pDnode->port;
SReplica *pReplica = &alterReq.replicas[numOfReplicas]; memcpy(alterEpset.eps[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
pReplica->id = pMObj->id; if (pMObj->state == TAOS_SYNC_STATE_LEADER) {
pReplica->port = pMObj->pDnode->port; alterEpset.inUse = numOfReplicas;
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++;
} }
numOfReplicas++;
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
} }
alterReq.replica = numOfReplicas; alterReq.replica = numOfReplicas;
alterEpset.numOfEps = numOfReplicas;
while (1) { dropReq.dnodeId = pDnode->id;
SMnodeObj *pMObj = NULL; dropEpSet.numOfEps = 1;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); dropEpSet.eps[0].port = pDnode->port;
if (pIter == NULL) break; memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
if (pMObj->id != pObj->id) {
STransAction action = {0};
alterReq.dnodeId = pMObj->id;
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_MNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj);
return -1;
}
}
sdbRelease(pSdb, pMObj); {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
STransAction action = {
.epSet = alterEpset,
.pCont = pReq,
.contLen = contLen,
.msgType = TDMT_DND_ALTER_MNODE,
.acceptableCode = 0,
};
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
} }
{ {
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
SDDropMnodeReq dropReq = {0};
dropReq.dnodeId = pObj->id;
int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq); int32_t contLen = tSerializeSCreateDropMQSBNodeReq(NULL, 0, &dropReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq); tSerializeSCreateDropMQSBNodeReq(pReq, contLen, &dropReq);
action.epSet = mndGetDnodeEpset(pDnode); STransAction action = {
action.pCont = pReq; .epSet = dropEpSet,
action.contLen = contLen; .pCont = pReq,
action.msgType = TDMT_DND_DROP_MNODE; .contLen = contLen,
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; .msgType = TDMT_DND_DROP_MNODE,
.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
};
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
...@@ -662,7 +674,7 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) { ...@@ -662,7 +674,7 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
} }
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SDAlterMnodeReq alterReq = {0}; SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
...@@ -670,12 +682,6 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { ...@@ -670,12 +682,6 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
return -1; return -1;
} }
if (alterReq.dnodeId != pMnode->selfDnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
mError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMnode->selfDnodeId);
return -1;
}
SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1}; SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1};
for (int32_t i = 0; i < alterReq.replica; ++i) { for (int32_t i = 0; i < alterReq.replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i]; SNodeInfo *pNode = &cfg.nodeInfo[i];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册