未验证 提交 458cd4d1 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13087 from taosdata/fix/mnode

fix: error in sync sem
...@@ -64,6 +64,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -64,6 +64,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
qWorkerProcessFetchRsp(NULL, NULL, pRpc); qWorkerProcessFetchRsp(NULL, NULL, pRpc);
return; return;
} else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) {
dmSetMnodeEpSet(&pDnode->data, pEpSet);
} else { } else {
} }
......
...@@ -326,6 +326,7 @@ void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet ...@@ -326,6 +326,7 @@ void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet
} }
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return;
taosThreadRwlockWrlock(&pData->lock); taosThreadRwlockWrlock(&pData->lock);
pData->mnodeEps = *pEpSet; pData->mnodeEps = *pEpSet;
taosThreadRwlockUnlock(&pData->lock); taosThreadRwlockUnlock(&pData->lock);
......
...@@ -124,6 +124,11 @@ typedef enum { ...@@ -124,6 +124,11 @@ typedef enum {
TRN_POLICY_RETRY = 1, TRN_POLICY_RETRY = 1,
} ETrnPolicy; } ETrnPolicy;
typedef enum {
TRN_EXEC_PARALLEL = 0,
TRN_EXEC_ONE_BY_ONE = 1,
} ETrnExecType;
typedef enum { typedef enum {
DND_REASON_ONLINE = 0, DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT, DND_REASON_STATUS_MSG_TIMEOUT,
...@@ -152,6 +157,7 @@ typedef struct { ...@@ -152,6 +157,7 @@ typedef struct {
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
ETrnType type; ETrnType type;
ETrnExecType parallel;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
SRpcHandleInfo rpcInfo; SRpcHandleInfo rpcInfo;
......
...@@ -81,6 +81,7 @@ typedef struct { ...@@ -81,6 +81,7 @@ typedef struct {
bool standby; bool standby;
bool restored; bool restored;
int32_t errCode; int32_t errCode;
int32_t transId;
} SSyncMgmt; } SSyncMgmt;
typedef struct { typedef struct {
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
int32_t mndInitSync(SMnode *pMnode); int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode);
bool mndIsMaster(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode);
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId);
void mndSyncStart(SMnode *pMnode); void mndSyncStart(SMnode *pMnode);
void mndSyncStop(SMnode *pMnode); void mndSyncStop(SMnode *pMnode);
......
...@@ -57,6 +57,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); ...@@ -57,6 +57,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetExecOneByOne(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp); void mndTransProcessRsp(SRpcMsg *pRsp);
......
...@@ -313,16 +313,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -313,16 +313,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
{ {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
STransAction action = { STransAction action = {
.epSet = alterEpset, .epSet = createEpset,
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
.msgType = TDMT_DND_ALTER_MNODE, .msgType = TDMT_DND_CREATE_MNODE,
.acceptableCode = 0, .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
}; };
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
...@@ -332,16 +332,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -332,16 +332,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
} }
{ {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &createReq); tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
STransAction action = { STransAction action = {
.epSet = createEpset, .epSet = alterEpset,
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
.msgType = TDMT_DND_CREATE_MNODE, .msgType = TDMT_DND_ALTER_MNODE,
.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED, .acceptableCode = 0,
}; };
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
...@@ -365,6 +365,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, ...@@ -365,6 +365,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mndTransSetExecOneByOne(pTrans);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
...@@ -536,7 +537,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { ...@@ -536,7 +537,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mndTransSetExecOneByOne(pTrans);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
...@@ -701,14 +702,17 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { ...@@ -701,14 +702,17 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
} }
} }
mTrace("trans:-1, sync reconfig will be proposed");
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->standby = 0; pMgmt->standby = 0;
int32_t code = syncReconfig(pMgmt->sync, &cfg); int32_t code = syncReconfig(pMgmt->sync, &cfg);
if (code != 0) { if (code != 0) {
mError("failed to alter mnode sync since %s", terrstr()); mError("trans:-1, failed to propose sync reconfig since %s", terrstr());
return code; return code;
} else { } else {
pMgmt->errCode = 0; pMgmt->errCode = 0;
pMgmt->transId = -1;
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode)); mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode));
terrno = pMgmt->errCode; terrno = pMgmt->errCode;
......
...@@ -507,6 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -507,6 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb); mndTransSetDbInfo(pTrans, pDb);
mndTransSetExecOneByOne(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
......
...@@ -28,16 +28,26 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { ...@@ -28,16 +28,26 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSdbRaw *pRaw = pMsg->pCont; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " term:%" PRId64 " role:%s", pRaw, cbMeta.index, cbMeta.term,
syncStr(cbMeta.state)); int32_t transId = sdbGetIdFromRaw(pRaw);
sdbWriteWithoutFree(pMnode->pSdb, pRaw); pMgmt->errCode = cbMeta.code;
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId,
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMnode->syncMgmt.syncSem); if (pMgmt->errCode == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
}
if (pMgmt->transId == transId) {
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
}
tsem_post(&pMgmt->syncSem);
} }
} }
...@@ -78,11 +88,19 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char ...@@ -78,11 +88,19 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char
} }
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%" PRId64 ", cbMeta.term:%" PRId64 ", cbMeta.index:%" PRId64, SMnode *pMnode = pFsm->data;
cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index); SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SMnode *pMnode = pFsm->data;
pMnode->syncMgmt.errCode = cbMeta.code; pMgmt->errCode = cbMeta.code;
tsem_post(&pMnode->syncMgmt.syncSem); mInfo("trans:-1, sync reconfig is proposed, savedTransId:%d code:0x%x, curTerm:%" PRId64 " term:%" PRId64,
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
if (pMgmt->transId == -1) {
if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
}
tsem_post(&pMgmt->syncSem);
}
} }
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
...@@ -165,15 +183,17 @@ void mndCleanupSync(SMnode *pMnode) { ...@@ -165,15 +183,17 @@ void mndCleanupSync(SMnode *pMnode) {
memset(pMgmt, 0, sizeof(SSyncMgmt)); memset(pMgmt, 0, sizeof(SSyncMgmt));
} }
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0; SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
rsp.pCont = rpcMallocCont(rsp.contLen); rsp.pCont = rpcMallocCont(rsp.contLen);
if (rsp.pCont == NULL) return -1; if (rsp.pCont == NULL) return -1;
memcpy(rsp.pCont, pRaw, rsp.contLen); memcpy(rsp.pCont, pRaw, rsp.contLen);
pMgmt->errCode = 0;
pMgmt->transId = transId;
mTrace("trans:%d, will be proposed", pMgmt->transId);
const bool isWeak = false; const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
if (code == 0) { if (code == 0) {
...@@ -187,7 +207,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -187,7 +207,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
} }
rpcFreeCont(rsp.pCont); rpcFreeCont(rsp.pCont);
if (code != 0) return code; if (code != 0) {
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
return code;
}
return pMgmt->errCode; return pMgmt->errCode;
} }
......
...@@ -140,6 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -140,6 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT16(pRaw, dataPos, stage, _OVER) SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
...@@ -245,12 +246,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -245,12 +246,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int16_t stage = 0; int16_t stage = 0;
int16_t policy = 0; int16_t policy = 0;
int16_t type = 0; int16_t type = 0;
int16_t parallel = 0;
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
SDB_GET_INT16(pRaw, dataPos, &type, _OVER) SDB_GET_INT16(pRaw, dataPos, &type, _OVER)
SDB_GET_INT16(pRaw, dataPos, &parallel, _OVER)
pTrans->stage = stage; pTrans->stage = stage;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->type = type; pTrans->type = type;
pTrans->parallel = parallel;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
...@@ -665,6 +669,8 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { ...@@ -665,6 +669,8 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
} }
void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
...@@ -674,7 +680,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -674,7 +680,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("trans:%d, sync to other nodes", pTrans->id); mDebug("trans:%d, sync to other nodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -970,7 +976,18 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -970,7 +976,18 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgSent) continue;
if (pAction->msgSent) {
if (pAction->msgReceived) {
continue;
} else {
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
} else {
continue;
}
}
}
int64_t signature = pTrans->id; int64_t signature = pTrans->id;
signature = (signature << 32); signature = (signature << 32);
...@@ -990,6 +1007,9 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -990,6 +1007,9 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
}
} else { } else {
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
......
...@@ -386,6 +386,8 @@ SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len); ...@@ -386,6 +386,8 @@ SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len);
const char *sdbTableName(ESdbType type); const char *sdbTableName(ESdbType type);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
int32_t sdbGetIdFromRaw(SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdb.h" #include "sdb.h"
int32_t sdbGetIdFromRaw(SSdbRaw *pRaw) {
int32_t id = *((int32_t *)(pRaw->pData));
return id;
}
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
SSdbRaw *pRaw = taosMemoryCalloc(1, dataLen + sizeof(SSdbRaw)); SSdbRaw *pRaw = taosMemoryCalloc(1, dataLen + sizeof(SSdbRaw));
if (pRaw == NULL) { if (pRaw == NULL) {
......
...@@ -60,7 +60,9 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, ...@@ -60,7 +60,9 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
return; return;
} }
} }
assert(0);
// maybe config change
// assert(0);
} }
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
......
...@@ -981,6 +981,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -981,6 +981,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
} }
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) { void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
pSyncNode->pRaftCfg->cfg = *newConfig; pSyncNode->pRaftCfg->cfg = *newConfig;
int32_t ret = 0; int32_t ret = 0;
...@@ -1014,6 +1015,15 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro ...@@ -1014,6 +1015,15 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro
// isDrop // isDrop
*isDrop = true; *isDrop = true;
bool IamInOld, IamInNew;
for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
*isDrop = false;
break;
}
}
for (int i = 0; i < newConfig->replicaNum; ++i) { for (int i = 0; i < newConfig->replicaNum; ++i) {
if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
......
...@@ -55,7 +55,7 @@ ...@@ -55,7 +55,7 @@
./test.sh -f tsim/bnode/basic1.sim ./test.sh -f tsim/bnode/basic1.sim
# ---- mnode # ---- mnode
#./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim ./test.sh -f tsim/mnode/basic2.sim
# ---- show # ---- show
......
...@@ -88,7 +88,7 @@ sql show mnodes ...@@ -88,7 +88,7 @@ sql show mnodes
print $data(1)[0] $data(1)[1] $data(1)[2] print $data(1)[0] $data(1)[1] $data(1)[2]
print $data(2)[0] $data(2)[1] $data(2)[2] print $data(2)[0] $data(2)[1] $data(2)[2]
if $rows != 2 then if $rows != 1 then
return -1 return -1
endi endi
if $data(1)[0] != 1 then if $data(1)[0] != 1 then
...@@ -97,16 +97,16 @@ endi ...@@ -97,16 +97,16 @@ endi
if $data(1)[2] != LEADER then if $data(1)[2] != LEADER then
return -1 return -1
endi endi
if $data(2)[0] != NULL then if $data(2)[0] != null then
goto step2 goto step2
endi endi
if $data(2)[2] != NULL then if $data(2)[2] != null then
goto step2 goto step2
endi endi
sleep 2000 sleep 2000
print =============== create drop mnodes print =============== create mnodes
sql create mnode on dnode 2 sql create mnode on dnode 2
sql show mnodes sql show mnodes
if $rows != 2 then if $rows != 2 then
......
...@@ -119,9 +119,16 @@ if $data(2)[4] != ready then ...@@ -119,9 +119,16 @@ if $data(2)[4] != ready then
endi endi
print =============== insert data print =============== insert data
#sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd" sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
#sql create table db.ctb using db.stb tags(101, 102, "103") sql create table db.ctb using db.stb tags(101, 102, "103")
#sql insert into db.ctb values(now, 1, "2") sql insert into db.ctb values(now, 1, "2")
sql select * from db.ctb
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode2 -s stop system sh/exec.sh -n dnode2 -s stop
\ No newline at end of file
...@@ -15,7 +15,7 @@ $x = 0 ...@@ -15,7 +15,7 @@ $x = 0
step1: step1:
$x = $x + 1 $x = $x + 1
sleep 1000 sleep 1000
if $x == 20 then if $x == 50 then
return -1 return -1
endi endi
sql show dnodes -x step1 sql show dnodes -x step1
...@@ -37,7 +37,7 @@ $x = 0 ...@@ -37,7 +37,7 @@ $x = 0
step2: step2:
$x = $x + 1 $x = $x + 1
sleep 1000 sleep 1000
if $x == 20 then if $x == 50 then
return -1 return -1
endi endi
sql show mnodes -x step2 sql show mnodes -x step2
...@@ -68,7 +68,7 @@ $x = 0 ...@@ -68,7 +68,7 @@ $x = 0
step4: step4:
$x = $x + 1 $x = $x + 1
sleep 1000 sleep 1000
if $x == 20 then if $x == 50 then
return -1 return -1
endi endi
sql show mnodes -x step4 sql show mnodes -x step4
...@@ -98,7 +98,7 @@ $x = 0 ...@@ -98,7 +98,7 @@ $x = 0
step5: step5:
$x = $x + 1 $x = $x + 1
sleep 1000 sleep 1000
if $x == 20 then if $x == 50 then
return -1 return -1
endi endi
sql show mnodes -x step5 sql show mnodes -x step5
...@@ -119,7 +119,7 @@ $x = 0 ...@@ -119,7 +119,7 @@ $x = 0
step6: step6:
$x = $x + 1 $x = $x + 1
sleep 1000 sleep 1000
if $x == 20 then if $x == 50 then
return -1 return -1
endi endi
sql show mnodes -x step6 sql show mnodes -x step6
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册