未验证 提交 964da014 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12942 from taosdata/fix/mnode

feat: create mnode
......@@ -29,6 +29,7 @@ extern "C" {
typedef struct SMnode SMnode;
typedef struct {
bool standby;
bool deploy;
int8_t replica;
int8_t selfIndex;
......
......@@ -44,12 +44,9 @@ extern "C" {
}
#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t)
#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t)
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
{ \
......@@ -66,11 +63,8 @@ extern "C" {
}
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
......@@ -356,6 +350,14 @@ typedef struct SSdb {
SdbDecodeFp decodeFps[SDB_MAX];
} SSdb;
typedef struct SSdbIter {
TdFilePtr file;
int64_t readlen;
} SSdbIter;
SSdbIter *sdbIterInit(SSdb *pSdb);
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len);
#ifdef __cplusplus
}
#endif
......
......@@ -82,14 +82,29 @@ typedef struct SFsmCbMeta {
SyncTerm currentTerm;
} SFsmCbMeta;
typedef struct SReConfigCbMeta {
int32_t code;
SyncIndex index;
SyncTerm term;
SyncTerm currentTerm;
} SReConfigCbMeta;
typedef struct SSyncFSM {
void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len);
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
// int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM;
// abstract definition of log store in raft
......
......@@ -428,11 +428,11 @@ enum {
};
#define DEFAULT_HANDLE 0
#define MNODE_HANDLE -1
#define QNODE_HANDLE -2
#define SNODE_HANDLE -3
#define VNODE_HANDLE -4
#define BNODE_HANDLE -5
#define MNODE_HANDLE 1
#define QNODE_HANDLE -1
#define SNODE_HANDLE -2
#define VNODE_HANDLE -3
#define BNODE_HANDLE -4
#define TSDB_CONFIG_OPTION_LEN 16
#define TSDB_CONIIG_VALUE_LEN 48
......
......@@ -53,43 +53,45 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
*pDeployed = deployed->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", file);
goto _OVER;
}
pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto _OVER;
}
for (int32_t i = 0; i < pMgmt->replica; ++i) {
cJSON *node = cJSON_GetArrayItem(mnodes, i);
if (node == NULL) break;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
if (mnodes != NULL) {
if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", file);
goto _OVER;
}
pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file);
pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto _OVER;
}
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", file);
goto _OVER;
for (int32_t i = 0; i < pMgmt->replica; ++i) {
cJSON *node = cJSON_GetArrayItem(mnodes, i);
if (node == NULL) break;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file);
goto _OVER;
}
pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file);
goto _OVER;
}
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", file);
goto _OVER;
}
pReplica->port = port->valueint;
}
pReplica->port = port->valueint;
}
code = 0;
......@@ -122,21 +124,23 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pMsg != NULL) {
pReplica = &pMsg->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
if (replica > 0) {
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pMsg != NULL) {
pReplica = &pMsg->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
}
}
......
......@@ -39,32 +39,44 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
}
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
pOption->standby = false;
pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb;
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1;
pReplica->port = tsServerPort;
tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->msgCb = pMgmt->msgCb;
pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pOption->deploy = false;
pOption->standby = false;
if (pMgmt->replica > 0) {
pOption->standby = true;
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue;
pReplica->id = pMgmt->replicas[i].id;
pReplica->port = pMgmt->replicas[i].port;
memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN);
}
}
}
static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
pOption->msgCb = pMgmt->msgCb;
pOption->standby = false;
pOption->deploy = false;
pOption->replica = pCreate->replica;
pOption->selfIndex = -1;
for (int32_t i = 0; i < pCreate->replica; ++i) {
SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pCreate->replicas[i].id;
......@@ -79,17 +91,13 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
dError("failed to build mnode options since %s", terrstr());
return -1;
}
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
return 0;
}
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) {
if (mmBuildOptionForAlter(pMgmt, &option, pMsg) != 0) {
return -1;
}
......@@ -97,12 +105,6 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
return -1;
}
bool deployed = true;
if (mmWriteFile(pMgmt, pMsg, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
return 0;
}
......@@ -177,7 +179,8 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
tmsgReportStartup("mnode-worker", "initialized");
if (!deployed) {
if (!deployed || pMgmt->replica > 0) {
pMgmt->replica = 0;
deployed = true;
if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
......
......@@ -61,6 +61,11 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dTrace("msg:%p, get from mnode-sync queue", pMsg);
pMsg->info.node = pMgmt->pMnode;
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
int32_t code = mndProcessSyncMsg(pMsg);
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
......
......@@ -76,11 +76,12 @@ typedef struct {
typedef struct {
SWal *pWal;
int32_t errCode;
bool restored;
sem_t syncSem;
int64_t sync;
ESyncState state;
bool standby;
bool restored;
int32_t errCode;
} SSyncMgmt;
typedef struct {
......
......@@ -39,14 +39,16 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
int32_t mndInitMnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_MNODE,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultMnode,
.encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
.insertFp = (SdbInsertFp)mndMnodeActionInsert,
.updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndMnodeActionDelete};
SSdbTable table = {
.sdbType = SDB_MNODE,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultMnode,
.encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
.insertFp = (SdbInsertFp)mndMnodeActionInsert,
.updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndMnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
......
......@@ -17,22 +17,26 @@
#include "mndSync.h"
#include "mndTrans.h"
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
}
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
SSdb *pSdb = pMnode->pSdb;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
SMnode *pMnode = pFsm->data;
SSdbRaw *pRaw = pMsg->pCont;
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
sdbWriteWithoutFree(pSdb, pRaw);
sdbSetApplyIndex(pSdb, cbMeta.index);
sdbSetApplyTerm(pSdb, cbMeta.term);
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMgmt->syncSem);
tsem_post(&pMnode->syncMgmt.syncSem);
}
}
......@@ -49,15 +53,41 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
pMnode->syncMgmt.restored = true;
}
void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data;
SSdbIter *pIter = iter;
if (iter == NULL) {
pIter = sdbIterInit(pMnode->pSdb);
}
return sdbIterRead(pMnode->pSdb, pIter, ppBuf, len);
}
int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) {
SMnode *pMnode = pFsm->data;
sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf);
return 0;
}
void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
}
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpRestoreFinish = mndRestoreFinish;
pFsm->FpRestoreSnapshot = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpSnapshotRead = mndSnapshotRead;
pFsm->FpSnapshotApply = mndSnapshotApply;
pFsm->FpReConfigCb = mndReConfig;
return pFsm;
}
......@@ -90,10 +120,13 @@ int32_t mndInitSync(SMnode *pMnode) {
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex;
mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex,
pMgmt->standby);
for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMnode->replicas[i].port;
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
}
tsem_init(&pMgmt->syncSem, 0, 0);
......@@ -149,7 +182,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync);
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
mDebug("sync:%" PRId64 " is started", pMgmt->sync);
}
......@@ -161,3 +198,18 @@ bool mndIsMaster(SMnode *pMnode) {
return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
}
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
SSyncCfg cfg = {.replicaNum = pOption->replica, .myIndex = pOption->selfIndex};
mInfo("start to alter mnode sync, replica:%d myindex:%d standby:%d", cfg.replicaNum, cfg.myIndex, pOption->standby);
for (int32_t i = 0; i < pOption->replica; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pOption->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pOption->replicas[i].port;
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
}
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->standby = pOption->standby;
return syncReconfig(pMgmt->sync, &cfg);
}
\ No newline at end of file
......@@ -263,6 +263,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb;
pMnode->selfId = pOption->replicas[pOption->selfIndex].id;
pMnode->syncMgmt.standby = pOption->standby;
}
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
......@@ -329,12 +330,6 @@ void mndClose(SMnode *pMnode) {
}
}
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
mDebug("start to alter mnode");
mDebug("mnode is altered");
return 0;
}
int32_t mndStart(SMnode *pMnode) {
mndSyncStart(pMnode);
return mndInitTimer(pMnode);
......
......@@ -392,3 +392,66 @@ int32_t sdbDeploy(SSdb *pSdb) {
return 0;
}
SSdbIter *sdbIterInit(SSdb *pSdb) {
char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
if (taosCopyFile(datafile, tmpfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
return NULL;
}
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
if (pIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
if (pIter->file == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read snapshot file:%s since %s", tmpfile, terrstr());
taosMemoryFree(pIter);
return NULL;
}
mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter);
return pIter;
}
SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) {
const int32_t maxlen = 100;
char *pBuf = taosMemoryCalloc(1, maxlen);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
if (readlen == 0) {
mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen);
taosMemoryFree(pBuf);
taosCloseFile(&pIter->file);
taosMemoryFree(pIter);
pIter = NULL;
} else if (readlen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen);
taosMemoryFree(pBuf);
taosCloseFile(&pIter->file);
taosMemoryFree(pIter);
pIter = NULL;
} else {
pIter->readlen += readlen;
mTrace("read snapshot, readlen:%" PRId64, pIter->readlen);
*ppBuf = pBuf;
*buflen = readlen;
}
return pIter;
}
......@@ -147,6 +147,10 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinish = NULL;
pFsm->FpRestoreFinishCb = NULL;
pFsm->FpSnapshotRead = NULL;
pFsm->FpSnapshotApply = NULL;
pFsm->FpReConfigCb = NULL;
return pFsm;
}
\ No newline at end of file
......@@ -148,8 +148,8 @@ typedef struct SSyncNode {
SSyncRespMgr* pSyncRespMgr;
// restore state
bool restoreFinish;
//sem_t restoreSem;
bool restoreFinish;
// sem_t restoreSem;
SSnapshot* pSnapshot;
} SSyncNode;
......
......@@ -42,6 +42,7 @@ typedef struct SVotesGranted {
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode);
void voteGrantedDestroy(SVotesGranted *pVotesGranted);
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode);
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
......@@ -65,6 +66,7 @@ typedef struct SVotesRespond {
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
void votesRespondDestory(SVotesRespond *pVotesRespond);
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
......
......@@ -362,8 +362,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinish != NULL) {
ths->pFsm->FpRestoreFinish(ths->pFsm);
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
......
......@@ -139,8 +139,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// restore finish
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
if (pSyncNode->restoreFinish == false) {
if (pSyncNode->pFsm->FpRestoreFinish != NULL) {
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) {
pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm);
}
pSyncNode->restoreFinish = true;
sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
......
......@@ -509,7 +509,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
}
//tsem_init(&(pSyncNode->restoreSem), 0, 0);
// tsem_init(&(pSyncNode->restoreSem), 0, 0);
// start in syncNodeStart
// start raft
......@@ -606,7 +606,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pSnapshot);
}
//tsem_destroy(&pSyncNode->restoreSem);
// tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
......@@ -920,6 +920,17 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
}
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
bool hit = false;
for (int i = 0; i < newConfig->replicaNum; ++i) {
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 &&
pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) {
newConfig->myIndex = i;
hit = true;
break;
}
}
ASSERT(hit == true);
pSyncNode->pRaftCfg->cfg = *newConfig;
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
ASSERT(ret == 0);
......@@ -949,6 +960,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
}
......
......@@ -45,6 +45,17 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
}
}
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) {
pVotesGranted->replicas = &(pSyncNode->replicasId);
pVotesGranted->replicaNum = pSyncNode->replicaNum;
voteGrantedClearVotes(pVotesGranted);
pVotesGranted->term = 0;
pVotesGranted->quorum = pSyncNode->quorum;
pVotesGranted->toLeader = false;
pVotesGranted->pSyncNode = pSyncNode;
}
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
return ret;
......@@ -168,6 +179,13 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) {
}
}
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) {
pVotesRespond->replicas = &(pSyncNode->replicasId);
pVotesRespond->replicaNum = pSyncNode->replicaNum;
pVotesRespond->term = 0;
pVotesRespond->pSyncNode = pSyncNode;
}
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
bool ret = false;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
......
......@@ -73,9 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0;
}
void FpRestoreFinishCb(struct SSyncFSM* pFsm) {
sTrace("==callback== ==FpRestoreFinishCb==");
}
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
......@@ -83,7 +81,7 @@ SSyncFSM* createFsm() {
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb;
pFsm->FpRestoreFinish = FpRestoreFinishCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb;
return pFsm;
}
......
......@@ -55,7 +55,8 @@
./test.sh -f tsim/bnode/basic1.sim
# ---- mnode
./test.sh -f tsim/mnode/basic1.sim
#./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
# ---- show
./test.sh -f tsim/show/basic.sim
......@@ -106,7 +107,7 @@
./test.sh -f tsim/tmq/basic3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m
#./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
......
......@@ -136,7 +136,7 @@ echo "qDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "sDebugFlag 135" >> $TAOS_CFG
echo "sDebugFlag 143" >> $TAOS_CFG
echo "wDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != LEADER then
return -1
endi
print =============== create dnodes
sql create dnode $hostname port 7200
sleep 2000
sql show dnodes;
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
print $data02
if $data02 != 0 then
return -1
endi
if $data12 != 0 then
return -1
endi
if $data04 != ready then
return -1
endi
if $data14 != ready then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != LEADER then
return -1
endi
print =============== create mnode 2
sql create mnode on dnode 2
sql show mnodes
if $rows != 2 then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册