提交 7fcfd45a 编写于 作者: S Shengliang Guan

feat: increase the number of database replicas

上级 c388099e
......@@ -1002,7 +1002,6 @@ typedef struct {
typedef struct {
int32_t vgId;
int32_t dnodeId;
char db[TSDB_DB_FNAME_LEN];
int64_t dbUid;
int32_t vgVersion;
......@@ -1025,15 +1024,13 @@ typedef struct {
int8_t compression;
int8_t strict;
int8_t cacheLastRow;
int8_t isTsma;
int8_t standby;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
// for tsma
int8_t isTsma;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......@@ -1071,8 +1068,8 @@ typedef struct {
int8_t walLevel;
int8_t strict;
int8_t cacheLastRow;
int8_t replica;
int8_t selfIndex;
int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA];
} SAlterVnodeReq;
......
......@@ -221,9 +221,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT_VNODE, "vnode-compact-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL)
// Requests handled by QNODE
TD_NEW_MSG_SEG(TDMT_QND_MSG)
......
......@@ -2934,7 +2934,6 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
......@@ -2957,6 +2956,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1;
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
......@@ -2972,7 +2973,6 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2986,7 +2986,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
......@@ -3009,6 +3008,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
......@@ -3035,8 +3036,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
}
}
if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......@@ -3123,8 +3122,8 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
......@@ -3154,8 +3153,8 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
......
......@@ -219,9 +219,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
......
......@@ -156,13 +156,14 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod;
pCfg->standby = pCfg->standby;
pCfg->syncCfg.myIndex = pCreate->selfIndex;
pCfg->syncCfg.replicaNum = pCreate->replica;
memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
for (int i = 0; i < pCreate->replica; ++i) {
pCfg->syncCfg.nodeInfo[i].nodePort = pCreate->replicas[i].port;
snprintf(pCfg->syncCfg.nodeInfo[i].nodeFqdn, sizeof(pCfg->syncCfg.nodeInfo[i].nodeFqdn), "%s",
pCreate->replicas[i].fqdn);
SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
pNode->nodePort = pCreate->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pCreate->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
}
}
......@@ -175,6 +176,8 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCreateVnodeReq createReq = {0};
SVnodeCfg vnodeCfg = {0};
SWrapperCfg wrapperCfg = {0};
int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0};
......@@ -183,12 +186,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dDebug("vgId:%d, create vnode req is received, tsma:%d", createReq.vgId, createReq.isTsma);
SVnodeCfg vnodeCfg = {0};
dDebug("vgId:%d, create vnode req is received, tsma:%d standby:%d", createReq.vgId, createReq.isTsma,
createReq.standby);
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
SWrapperCfg wrapperCfg = {0};
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
......@@ -313,8 +313,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
......@@ -36,7 +36,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode);
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
......@@ -261,7 +261,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb);
}
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
......@@ -270,7 +270,7 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby);
if (pReq == NULL) return -1;
action.pCont = pReq;
......@@ -286,7 +286,7 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0;
}
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
......@@ -296,7 +296,7 @@ static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_VNODE;
action.msgType = msgType;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -467,7 +467,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid) != 0) {
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false) != 0) {
return -1;
}
}
......@@ -688,29 +688,37 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup) != 0) {
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_CONFIG) != 0) {
return -1;
}
} else {
SVgObj newVgroup = {0};
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
mndTransSetSerial(pTrans);
if (newVgroup.replica < pDb->cfg.replications) {
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
......@@ -720,9 +728,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
} else {
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
......@@ -733,15 +741,18 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
}
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
}
return 0;
......@@ -774,18 +785,16 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
}
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
if (pTrans == NULL) return -1;
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
int32_t code = -1;
mndTransSetDbName(pTrans, pOld->name);
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
......
......@@ -421,7 +421,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
// todo add sma info here
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, false);
if (pReq == NULL) return -1;
action.pCont = pReq;
......
......@@ -51,9 +51,10 @@ int32_t mndInitVgroup(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndProcessCompactVnodeRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
......@@ -188,10 +189,10 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup);
}
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen,
bool standby) {
SCreateVnodeReq createReq = {0};
createReq.vgId = pVgroup->vgId;
createReq.dnodeId = pDnode->id;
memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
createReq.dbUid = pDb->uid;
createReq.vgVersion = pVgroup->version;
......@@ -218,6 +219,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.hashMethod = pDb->cfg.hashMethod;
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
createReq.pRetensions = pDb->cfg.pRetensions;
createReq.standby = standby;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v];
......@@ -274,7 +276,6 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
alterReq.strict = pDb->cfg.strict;
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
alterReq.replica = pVgroup->replica;
alterReq.selfIndex = -1;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &alterReq.replicas[v];
......@@ -290,13 +291,6 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
mndReleaseDnode(pMnode, pVgidDnode);
}
#if 0
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
#endif
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -508,7 +502,7 @@ _OVER:
taosArrayDestroy(pArray);
return code;
}
//--->
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
......@@ -536,7 +530,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[maxPos];
pVgid->dnodeId = pDnode->id;
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
pVgid->role = TAOS_SYNC_STATE_ERROR;
pDnode->numOfVnodes++;
mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
......@@ -547,16 +541,15 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
//--->
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2) {
int32_t removedNum = 0;
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i);
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
}
int32_t removedNum = 0;
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
SDnodeObj *pDnode = taosArrayGet(pArray, d);
......@@ -662,6 +655,7 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
int32_t cols = 0;
int64_t curMs = taosGetTimestampMs();
SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) {
......@@ -701,12 +695,15 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
if (i < pVgroup->replica) {
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);
char buf1[20] = {0};
SDnodeObj *pDnodeObj = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
ASSERT(pDnodeObj != NULL);
bool isOffLine = !mndIsDnodeOnline(pMnode, pDnodeObj, taosGetTimestampMs());
const char *role = isOffLine ? "OFFLINE" : syncStr(pVgroup->vnodeGid[i].role);
bool online = false;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
if (pDnode != NULL) {
online = mndIsDnodeOnline(pMnode, pDnode, curMs);
mndReleaseDnode(pMnode, pDnode);
}
char buf1[20] = {0};
const char *role = online ? syncStr(pVgroup->vnodeGid[i].role) : "OFFLINE";
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
......@@ -177,6 +177,7 @@ struct SVnodeCfg {
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
int8_t standby;
};
typedef struct {
......
......@@ -84,6 +84,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg);
#ifdef __cplusplus
}
......
......@@ -22,7 +22,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
SDecoder dc = {0};
......@@ -88,6 +88,9 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
}
} break;
case TDMT_VND_ALTER_REPLICA: {
vnodeSyncAlter(pVnode, pMsg);
} break;
default:
break;
}
......@@ -154,7 +157,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_ALTER_VNODE:
case TDMT_VND_ALTER_CONFIG:
break;
default:
ASSERT(0);
......@@ -784,7 +787,7 @@ _exit:
return 0;
}
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateTSmaReq req = {0};
SDecoder coder;
......
......@@ -27,6 +27,7 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo = {
.vgId = pVnode->config.vgId,
.isStandBy = pVnode->config.standby,
.syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal,
.msgcb = NULL,
......@@ -49,9 +50,38 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return 0;
}
void vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
SAlterVnodeReq req = {0};
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
vError("vgId:%d, failed to alter replica since %s", TD_VID(pVnode), terrstr());
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
tmsgSendRsp(&rsp);
}
vInfo("vgId:%d, start to alter vnode replica to %d", TD_VID(pVnode), req.replica);
SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
for (int32_t r = 0; r < req.replica; ++r) {
SNodeInfo *pNode = &cfg.nodeInfo[r];
tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = req.replicas[r].port;
vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
}
if (syncReconfig(pVnode->sync, &cfg) != 0) {
vError("vgId:%d, failed to propose sync reconfig since %s", TD_VID(pVnode), terrstr());
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
tmsgSendRsp(&rsp);
}
}
void vnodeSyncStart(SVnode *pVnode) {
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
if (pVnode->config.standby) {
syncStartStandBy(pVnode->sync);
} else {
syncStart(pVnode->sync);
}
}
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
......@@ -60,17 +90,32 @@ int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = tmsgSendReq(pEpSet, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0;
}
void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode));
// todo rpc response here
}
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) {
......@@ -87,20 +132,12 @@ void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta)
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SVnode *pVnode = (SVnode *)(pFsm->data);
SVnode *pVnode = pFsm->data;
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
SRpcMsg applyMsg;
syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
syncApplyMsgDestroy(pSyncApplyMsg);
/*
SRpcMsg applyMsg;
applyMsg = *pMsg;
applyMsg.pCont = rpcMallocCont(applyMsg.contLen);
assert(applyMsg.contLen == pMsg->contLen);
memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen);
*/
// recover handle for response
SRpcMsg saveRpcMsg;
int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
......@@ -142,14 +179,13 @@ void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm));
pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL;
pFsm->FpReConfigCb = NULL;
pFsm->FpReConfigCb = vnodeSyncReconfig;
return pFsm;
}
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
print =============== step1: create dnodes
sql create dnode $hostname port 7200
$loop_cnt = 0
step1:
$loop_cnt = $loop_cnt + 1
sleep 1000
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print =============== step2: create database
sql create database db vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
if $data(db)[4] != 1 then
return -1
endi
sql show dnodes
if $data(2)[2] != 1 then
return -1
endi
# vnodes
sql show dnodes
if $data(2)[2] != 1 then
return -1
endi
# v1_dnode
sql show db.vgroups
if $data(2)[3] != 2 then
return -1
endi
sql_error alter database db replica 3
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(101, "102")
sql insert into db.ctb values(now, 1, "2")
sql select * from db.stb
if $rows != 1 then
return -1
endi
print =============== step3: create dnodes
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$loop_cnt = 0
step3:
$loop_cnt = $loop_cnt + 1
sleep 1000
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data24 $data35
if $rows != 4 then
return -1
endi
if $data(1)[4] != ready then
goto step3
endi
if $data(2)[4] != ready then
goto step3
endi
if $data(3)[4] != ready then
goto step3
endi
if $data(4)[4] != ready then
goto step3
endi
return
print ============= step4: alter database
sql alter database db replica 3
if $rows != 3 then
return -1
endi
if $data(db)[4] != 3 then
return -1
endi
sql select * from db.stb
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册