提交 907ff2e4 编写于 作者: S Shengliang Guan

Merge branch 'feature/sync-mnode-integration' into fix/mnode

...@@ -29,6 +29,7 @@ extern "C" { ...@@ -29,6 +29,7 @@ extern "C" {
typedef struct SMnode SMnode; typedef struct SMnode SMnode;
typedef struct { typedef struct {
bool isStandBy;
bool deploy; bool deploy;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
......
...@@ -44,12 +44,9 @@ extern "C" { ...@@ -44,12 +44,9 @@ extern "C" {
} }
#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t) #define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t)
#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t) #define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t)
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t) #define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \ #define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
{ \ { \
...@@ -66,11 +63,8 @@ extern "C" { ...@@ -66,11 +63,8 @@ extern "C" {
} }
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t) #define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t) #define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t) #define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t) #define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ #define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
...@@ -356,6 +350,14 @@ typedef struct SSdb { ...@@ -356,6 +350,14 @@ typedef struct SSdb {
SdbDecodeFp decodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX];
} SSdb; } SSdb;
typedef struct SSdbIter {
TdFilePtr file;
int64_t readlen;
} SSdbIter;
SSdbIter *sdbIterInit(SSdb *pSdb);
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -82,14 +82,29 @@ typedef struct SFsmCbMeta { ...@@ -82,14 +82,29 @@ typedef struct SFsmCbMeta {
SyncTerm currentTerm; SyncTerm currentTerm;
} SFsmCbMeta; } SFsmCbMeta;
typedef struct SReConfigCbMeta {
int32_t code;
SyncIndex index;
SyncTerm term;
SyncTerm currentTerm;
} SReConfigCbMeta;
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len);
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
// int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM; } SSyncFSM;
// abstract definition of log store in raft // abstract definition of log store in raft
......
...@@ -428,11 +428,11 @@ enum { ...@@ -428,11 +428,11 @@ enum {
}; };
#define DEFAULT_HANDLE 0 #define DEFAULT_HANDLE 0
#define MNODE_HANDLE -1 #define MNODE_HANDLE 1
#define QNODE_HANDLE -2 #define QNODE_HANDLE -1
#define SNODE_HANDLE -3 #define SNODE_HANDLE -2
#define VNODE_HANDLE -4 #define VNODE_HANDLE -3
#define BNODE_HANDLE -5 #define BNODE_HANDLE -4
#define TSDB_CONFIG_OPTION_LEN 16 #define TSDB_CONFIG_OPTION_LEN 16
#define TSDB_CONIIG_VALUE_LEN 48 #define TSDB_CONIIG_VALUE_LEN 48
......
...@@ -55,9 +55,31 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu ...@@ -55,9 +55,31 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica; if (pMgmt->replica > 1) {
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (pMgmt->replicas[i].id == pMgmt->pData->dnodeId) {
pReplica->id = pMgmt->replicas[i].id;
pReplica->port = pMgmt->replicas[i].port;
memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN);
}
}
pMgmt->selfIndex = pOption->selfIndex;
pOption->isStandBy = 1;
} else {
pOption->replica = pMgmt->replica;
pOption->selfIndex = -1;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
for (int32_t i = 0; i < pOption->replica; ++i) {
if (pOption->replicas[i].id == pMgmt->pData->dnodeId) {
pOption->selfIndex = i;
}
}
pMgmt->selfIndex = pOption->selfIndex;
}
pOption->deploy = false; pOption->deploy = false;
} }
......
...@@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dTrace("msg:%p, get from mnode-sync queue", pMsg); dTrace("msg:%p, get from mnode-sync queue", pMsg);
pMsg->info.node = pMgmt->pMnode; pMsg->info.node = pMgmt->pMnode;
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
int32_t code = mndProcessSyncMsg(pMsg); int32_t code = mndProcessSyncMsg(pMsg);
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
......
...@@ -76,11 +76,12 @@ typedef struct { ...@@ -76,11 +76,12 @@ typedef struct {
typedef struct { typedef struct {
SWal *pWal; SWal *pWal;
int32_t errCode;
bool restored;
sem_t syncSem; sem_t syncSem;
int64_t sync; int64_t sync;
ESyncState state; ESyncState state;
bool isStandBy;
bool restored;
int32_t errCode;
} SSyncMgmt; } SSyncMgmt;
typedef struct { typedef struct {
......
...@@ -39,14 +39,16 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -39,14 +39,16 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
int32_t mndInitMnode(SMnode *pMnode) { int32_t mndInitMnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_MNODE, SSdbTable table = {
.keyType = SDB_KEY_INT32, .sdbType = SDB_MNODE,
.deployFp = (SdbDeployFp)mndCreateDefaultMnode, .keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndMnodeActionEncode, .deployFp = (SdbDeployFp)mndCreateDefaultMnode,
.decodeFp = (SdbDecodeFp)mndMnodeActionDecode, .encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
.insertFp = (SdbInsertFp)mndMnodeActionInsert, .decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
.updateFp = (SdbUpdateFp)mndMnodeActionUpdate, .insertFp = (SdbInsertFp)mndMnodeActionInsert,
.deleteFp = (SdbDeleteFp)mndMnodeActionDelete}; .updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
......
...@@ -17,22 +17,26 @@ ...@@ -17,22 +17,26 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
}
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSdb *pSdb = pMnode->pSdb; SSdbRaw *pRaw = pMsg->pCont;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state)); mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
sdbWriteWithoutFree(pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyIndex(pSdb, cbMeta.index); sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
sdbSetApplyTerm(pSdb, cbMeta.term); sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMgmt->syncSem); tsem_post(&pMnode->syncMgmt.syncSem);
} }
} }
...@@ -49,15 +53,41 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { ...@@ -49,15 +53,41 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
pMnode->syncMgmt.restored = true; pMnode->syncMgmt.restored = true;
} }
void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data;
SSdbIter *pIter = iter;
if (iter == NULL) {
pIter = sdbIterInit(pMnode->pSdb);
}
return sdbIterRead(pMnode->pSdb, pIter, ppBuf, len);
}
int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) {
SMnode *pMnode = pFsm->data;
sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf);
return 0;
}
void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
}
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode; pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg; pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = NULL; pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL; pFsm->FpRollBackCb = NULL;
pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpRestoreFinish = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpRestoreSnapshot = NULL; pFsm->FpSnapshotRead = mndSnapshotRead;
pFsm->FpSnapshotApply = mndSnapshotApply;
pFsm->FpReConfigCb = mndReConfig;
return pFsm; return pFsm;
} }
...@@ -90,10 +120,13 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -90,10 +120,13 @@ int32_t mndInitSync(SMnode *pMnode) {
SSyncCfg *pCfg = &syncInfo.syncCfg; SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica; pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex; pCfg->myIndex = pMnode->selfIndex;
mInfo("start to open mnode sync, replica:%d myIndex:%d standBy:%d", pCfg->replicaNum, pCfg->myIndex,
pMgmt->isStandBy);
for (int32_t i = 0; i < pMnode->replica; ++i) { for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i]; SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMnode->replicas[i].port; pNode->nodePort = pMnode->replicas[i].port;
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
} }
tsem_init(&pMgmt->syncSem, 0, 0); tsem_init(&pMgmt->syncSem, 0, 0);
...@@ -149,7 +182,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -149,7 +182,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync); if (pMgmt->isStandBy) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
mDebug("sync:%" PRId64 " is started", pMgmt->sync); mDebug("sync:%" PRId64 " is started", pMgmt->sync);
} }
...@@ -161,3 +198,18 @@ bool mndIsMaster(SMnode *pMnode) { ...@@ -161,3 +198,18 @@ bool mndIsMaster(SMnode *pMnode) {
return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
} }
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex};
mInfo("start to alter mnode sync, replica:%d myIndex:%d standBy:%d", cfg.replicaNum, cfg.myIndex, pOption->isStandBy);
for (int32_t i = 0; i < pOption->replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pOption->replicas[i].port;
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
}
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->isStandBy = pOption->isStandBy;
return syncReconfig(pMgmt->sync, &cfg);
}
\ No newline at end of file
...@@ -263,6 +263,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -263,6 +263,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb; pMnode->msgCb = pOption->msgCb;
pMnode->selfId = pOption->replicas[pOption->selfIndex].id; pMnode->selfId = pOption->replicas[pOption->selfIndex].id;
pMnode->syncMgmt.isStandBy = pOption->isStandBy;
} }
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
...@@ -329,12 +330,6 @@ void mndClose(SMnode *pMnode) { ...@@ -329,12 +330,6 @@ void mndClose(SMnode *pMnode) {
} }
} }
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
mDebug("start to alter mnode");
mDebug("mnode is altered");
return 0;
}
int32_t mndStart(SMnode *pMnode) { int32_t mndStart(SMnode *pMnode) {
mndSyncStart(pMnode); mndSyncStart(pMnode);
return mndInitTimer(pMnode); return mndInitTimer(pMnode);
......
...@@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) { ...@@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) {
return 0; return 0;
} }
SSdbIter *sdbIterInit(SSdb *pSdb) {
char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
if (taosCopyFile(datafile, tmpfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
return NULL;
}
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
if (pIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
if (pIter->file == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read snapshot file:%s since %s", tmpfile, terrstr());
taosMemoryFree(pIter);
return NULL;
}
mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter);
return pIter;
}
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) {
const int32_t maxlen = 100;
char *pBuf = taosMemoryCalloc(1, maxlen);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
if (readlen == 0) {
mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen);
taosMemoryFree(pBuf);
taosCloseFile(&pIter->file);
taosMemoryFree(pIter);
pIter = NULL;
} else if (readlen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen);
taosMemoryFree(pBuf);
taosCloseFile(&pIter->file);
taosMemoryFree(pIter);
pIter = NULL;
} else {
pIter->readlen += readlen;
mTrace("read snapshot, readlen:%" PRId64, pIter->readlen);
*ppBuf = pBuf;
*buflen = readlen;
}
return pIter;
}
...@@ -147,6 +147,10 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -147,6 +147,10 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinish = NULL; pFsm->FpRestoreFinishCb = NULL;
pFsm->FpSnapshotRead = NULL;
pFsm->FpSnapshotApply = NULL;
pFsm->FpReConfigCb = NULL;
return pFsm; return pFsm;
} }
\ No newline at end of file
...@@ -148,8 +148,8 @@ typedef struct SSyncNode { ...@@ -148,8 +148,8 @@ typedef struct SSyncNode {
SSyncRespMgr* pSyncRespMgr; SSyncRespMgr* pSyncRespMgr;
// restore state // restore state
bool restoreFinish; bool restoreFinish;
//sem_t restoreSem; // sem_t restoreSem;
SSnapshot* pSnapshot; SSnapshot* pSnapshot;
} SSyncNode; } SSyncNode;
......
...@@ -42,6 +42,7 @@ typedef struct SVotesGranted { ...@@ -42,6 +42,7 @@ typedef struct SVotesGranted {
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode);
void voteGrantedDestroy(SVotesGranted *pVotesGranted); void voteGrantedDestroy(SVotesGranted *pVotesGranted);
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode);
bool voteGrantedMajority(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
...@@ -65,6 +66,7 @@ typedef struct SVotesRespond { ...@@ -65,6 +66,7 @@ typedef struct SVotesRespond {
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
void votesRespondDestory(SVotesRespond *pVotesRespond); void votesRespondDestory(SVotesRespond *pVotesRespond);
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
......
...@@ -362,8 +362,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -362,8 +362,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// restore finish // restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) { if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinish != NULL) { if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->pFsm->FpRestoreFinish(ths->pFsm); ths->pFsm->FpRestoreFinishCb(ths->pFsm);
} }
ths->restoreFinish = true; ths->restoreFinish = true;
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
......
...@@ -139,8 +139,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -139,8 +139,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// restore finish // restore finish
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
if (pSyncNode->restoreFinish == false) { if (pSyncNode->restoreFinish == false) {
if (pSyncNode->pFsm->FpRestoreFinish != NULL) { if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) {
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm);
} }
pSyncNode->restoreFinish = true; pSyncNode->restoreFinish = true;
sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
......
...@@ -509,7 +509,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -509,7 +509,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
} }
//tsem_init(&(pSyncNode->restoreSem), 0, 0); // tsem_init(&(pSyncNode->restoreSem), 0, 0);
// start in syncNodeStart // start in syncNodeStart
// start raft // start raft
...@@ -606,7 +606,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -606,7 +606,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pSnapshot); taosMemoryFree(pSyncNode->pSnapshot);
} }
//tsem_destroy(&pSyncNode->restoreSem); // tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode // free memory in syncFreeNode
// taosMemoryFree(pSyncNode); // taosMemoryFree(pSyncNode);
...@@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
} }
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
bool hit = false;
for (int i = 0; i < newConfig->replicaNum; ++i) {
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 &&
pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) {
newConfig->myIndex = i;
hit = true;
break;
}
}
ASSERT(hit == true);
pSyncNode->pRaftCfg->cfg = *newConfig; pSyncNode->pRaftCfg->cfg = *newConfig;
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
ASSERT(ret == 0); ASSERT(ret == 0);
...@@ -949,6 +960,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { ...@@ -949,6 +960,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
} }
......
...@@ -45,6 +45,17 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) { ...@@ -45,6 +45,17 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
} }
} }
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) {
pVotesGranted->replicas = &(pSyncNode->replicasId);
pVotesGranted->replicaNum = pSyncNode->replicaNum;
voteGrantedClearVotes(pVotesGranted);
pVotesGranted->term = 0;
pVotesGranted->quorum = pSyncNode->quorum;
pVotesGranted->toLeader = false;
pVotesGranted->pSyncNode = pSyncNode;
}
bool voteGrantedMajority(SVotesGranted *pVotesGranted) { bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
bool ret = pVotesGranted->votes >= pVotesGranted->quorum; bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
return ret; return ret;
...@@ -168,6 +179,13 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) { ...@@ -168,6 +179,13 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) {
} }
} }
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
pVotesRespond->replicas = &(pSyncNode->replicasId);
pVotesRespond->replicaNum = pSyncNode->replicaNum;
pVotesRespond->term = 0;
pVotesRespond->pSyncNode = pSyncNode;
}
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
bool ret = false; bool ret = false;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) { for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
......
...@@ -73,9 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { ...@@ -73,9 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0; return 0;
} }
void FpRestoreFinishCb(struct SSyncFSM* pFsm) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
sTrace("==callback== ==FpRestoreFinishCb==");
}
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
...@@ -83,7 +81,7 @@ SSyncFSM* createFsm() { ...@@ -83,7 +81,7 @@ SSyncFSM* createFsm() {
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshot = GetSnapshotCb;
pFsm->FpRestoreFinish = FpRestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
return pFsm; return pFsm;
} }
......
...@@ -136,7 +136,7 @@ echo "qDebugFlag 143" >> $TAOS_CFG ...@@ -136,7 +136,7 @@ echo "qDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG
echo "sDebugFlag 135" >> $TAOS_CFG echo "sDebugFlag 143" >> $TAOS_CFG
echo "wDebugFlag 143" >> $TAOS_CFG echo "wDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != LEADER then
return -1
endi
print =============== create dnodes
sql create dnode $hostname port 7200
sleep 2000
sql show dnodes;
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
print $data02
if $data02 != 0 then
return -1
endi
if $data12 != 0 then
return -1
endi
if $data04 != ready then
return -1
endi
if $data14 != ready then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != LEADER then
return -1
endi
print =============== create mnode 2
sql create mnode on dnode 2
sql show mnodes
if $rows != 2 then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册