未验证 提交 117d2ce4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8471 from taosdata/feature/dnode3

Feature/dnode3
......@@ -721,6 +721,8 @@ typedef struct {
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t fsyncPeriod;
int8_t reserved[16];
int8_t precision;
int8_t compression;
int8_t cacheLastRow;
......@@ -728,8 +730,7 @@ typedef struct {
int8_t walLevel;
int8_t replica;
int8_t quorum;
int8_t reserved[9];
int32_t fsyncPeriod;
int8_t selfIndex;
SVnodeDesc nodes[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg;
......
......@@ -23,9 +23,9 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
typedef int64_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef int32_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SSyncTerm;
typedef enum {
......@@ -41,41 +41,41 @@ typedef struct {
typedef struct {
SyncNodeId nodeId;
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo;
typedef struct {
int selfIndex;
int nNode;
SNodeInfo* nodeInfo;
int32_t selfIndex;
int32_t replica;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
} SSyncCluster;
typedef struct {
int32_t selfIndex;
int nNode;
SNodeInfo* node;
ESyncRole* role;
int32_t selfIndex;
int32_t replica;
SNodeInfo node[TSDB_MAX_REPLICA];
ESyncRole role[TSDB_MAX_REPLICA];
} SNodesRole;
typedef struct SSyncFSM {
void* pData;
// apply committed log, bufs will be free by raft module
int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
// cluster commit callback
int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
// fsm return snapshot in ppBuf, bufs will be free by raft module
// TODO: getSnapshot SHOULD be async?
int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast);
int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast);
// fsm apply snapshot with pBuf data
int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast);
int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast);
// call when restore snapshot and log done
int (*onRestoreDone)(struct SSyncFSM* fsm);
int32_t (*onRestoreDone)(struct SSyncFSM* fsm);
void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
......@@ -101,13 +101,13 @@ typedef struct SSyncLogStore {
typedef struct SSyncServerState {
SyncNodeId voteFor;
SSyncTerm term;
SSyncTerm term;
} SSyncServerState;
typedef struct SSyncClusterConfig {
// Log index number of current cluster config.
SyncIndex index;
// Log index number of previous cluster config.
SyncIndex prevIndex;
......@@ -118,25 +118,21 @@ typedef struct SSyncClusterConfig {
typedef struct SStateManager {
void* pData;
void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state);
int32_t (*saveServerState)(struct SStateManager* stateMng, SSyncServerState* state);
const SSyncServerState* (*readServerState)(struct SStateManager* stateMng);
int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state);
void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
// void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
// const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
} SStateManager;
typedef struct {
SyncGroupId vgId;
SyncIndex snapshotIndex;
SSyncCluster syncCfg;
SSyncFSM fsm;
SyncGroupId vgId;
SyncIndex snapshotIndex;
SSyncCluster syncCfg;
SSyncFSM fsm;
SSyncLogStore logStore;
SStateManager stateManager;
} SSyncInfo;
......@@ -146,19 +142,20 @@ typedef struct SSyncNode SSyncNode;
int32_t syncInit();
void syncCleanUp();
SSyncNode syncStart(const SSyncInfo*);
void syncStop(SyncNodeId);
SSyncNode* syncStart(const SSyncInfo*);
void syncReconfig(const SSyncNode*, const SSyncCluster*);
void syncStop(const SSyncNode*);
int32_t syncPropose(SSyncNode syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
extern int32_t syncDebugFlag;
extern int32_t syncDebugFlag;
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_H*/
#endif /*_TD_LIBS_SYNC_H*/
......@@ -44,41 +44,41 @@ typedef struct {
EWalType walLevel; // wal level
} SWalCfg;
typedef void * twalh; // WAL HANDLE
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
struct SWal;
typedef struct SWal SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
//module initialization
int32_t walInit();
void walCleanUp();
// module initialization
int32_t walInit();
void walCleanUp();
//handle open and ctl
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh);
// handle open and ctl
SWal *walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
//write
//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(twalh, void* body, int32_t bodyLen);
int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize);
// write
// int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen);
int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
//apis for lifecycle management
void walFsync(twalh, bool force);
int32_t walCommit(twalh, int64_t ver);
//truncate after
int32_t walRollback(twalh, int64_t ver);
//notify that previous log can be pruned safely
int32_t walPrune(twalh, int64_t ver);
// apis for lifecycle management
void walFsync(SWal *, bool force);
int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous log can be pruned safely
int32_t walPrune(SWal *, int64_t ver);
//read
int32_t walRead(twalh, SWalHead **, int64_t ver);
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum);
// read
int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
//lifecycle check
int32_t walFirstVer(twalh);
int32_t walPersistedVer(twalh);
int32_t walLastVer(twalh);
//int32_t walDataCorrupted(twalh);
// lifecycle check
int32_t walFirstVer(SWal *);
int32_t walPersistedVer(SWal *);
int32_t walLastVer(SWal *);
// int32_t walDataCorrupted(SWal*);
#ifdef __cplusplus
}
......
......@@ -15,5 +15,12 @@
#include "sync.h"
int32_t syncInit() {return 0;}
void syncCleanUp() {}
\ No newline at end of file
int32_t syncInit() { return 0; }
void syncCleanUp() {}
SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; }
void syncStop(const SSyncNode* pNode) {}
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
\ No newline at end of file
......@@ -19,6 +19,18 @@ int32_t walInit() { return 0; }
void walCleanUp() {}
twalh walOpen(char *path, SWalCfg *pCfg) { return NULL; }
SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; }
int32_t walAlter(twalh pWal, SWalCfg *pCfg) { return 0; }
\ No newline at end of file
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
void walClose(SWal *pWal) {}
void walFsync(SWal *pWal, bool force) {}
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {}
int32_t walCommit(SWal *pWal, int64_t ver) { return 0; }
int32_t walRollback(SWal *pWal, int64_t ver) { return 0; }
int32_t walPrune(SWal *pWal, int64_t ver) { return 0; }
\ No newline at end of file
......@@ -24,7 +24,7 @@ extern "C" {
tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId();
char *mnodeGetClusterId();
int64_t mnodeGetClusterId();
EMnStatus mnodeGetStatus();
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
......
......@@ -202,12 +202,13 @@ static void mnodeSendTelemetryReport() {
return;
}
char clusterId[TSDB_CLUSTER_ID_LEN] = {0};
mnodeGetClusterId(clusterId);
int64_t clusterId = mnodeGetClusterId();
char clusterIdStr[20] = {0};
snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false);
mnodeBeginObject(&bw);
mnodeAddStringField(&bw, "instanceId", clusterId);
mnodeAddStringField(&bw, "instanceId", clusterIdStr);
mnodeAddIntField(&bw, "reportVersion", 1);
mnodeAddOsInfo(&bw);
mnodeAddCpuInfo(&bw);
......
......@@ -39,7 +39,7 @@
static struct {
int32_t state;
int32_t dnodeId;
char clusterId[TSDB_CLUSTER_ID_LEN];
int64_t clusterId;
tmr_h timer;
SMnodeFp fp;
SSteps * steps1;
......@@ -50,7 +50,7 @@ tmr_h mnodeGetTimer() { return tsMint.timer; }
int32_t mnodeGetDnodeId() { return tsMint.dnodeId; }
char *mnodeGetClusterId() { return tsMint.clusterId; }
int64_t mnodeGetClusterId() { return tsMint.clusterId; }
EMnStatus mnodeGetStatus() { return tsMint.state; }
......@@ -71,12 +71,14 @@ int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; }
static int32_t mnodeSetPara(SMnodePara para) {
tsMint.fp = para.fp;
tsMint.dnodeId = para.dnodeId;
strncpy(tsMint.clusterId, para.clusterId, TSDB_CLUSTER_ID_LEN);
tsMint.clusterId = para.clusterId;
if (tsMint.fp.SendMsgToDnode == NULL) return -1;
if (tsMint.fp.SendMsgToMnode == NULL) return -1;
if (tsMint.fp.SendRedirectMsg == NULL) return -1;
if (tsMint.fp.GetDnodeEp == NULL) return -1;
if (tsMint.dnodeId < 0) return -1;
if (tsMint.clusterId < 0) return -1;
return 0;
}
......@@ -141,7 +143,7 @@ static void mnodeCleanupStep2() { taosStepCleanup(tsMint.steps2); }
static bool mnodeNeedDeploy() {
if (tsMint.dnodeId > 0) return false;
if (tsMint.clusterId[0] != 0) return false;
if (tsMint.clusterId > 0) return false;
if (strcmp(tsFirst, tsLocalEp) != 0) return false;
return true;
}
......@@ -154,7 +156,7 @@ int32_t mnodeDeploy() {
tsMint.state = MN_STATUS_INIT;
}
if (tsMint.dnodeId <= 0 || tsMint.clusterId[0] == 0) {
if (tsMint.dnodeId <= 0 || tsMint.clusterId <= 0) {
mError("failed to deploy mnode since cluster not ready");
return TSDB_CODE_MND_NOT_READY;
}
......
......@@ -23,8 +23,8 @@ extern "C" {
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState);
int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState);
int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState);
int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState);
#ifdef __cplusplus
}
......
......@@ -62,19 +62,14 @@ typedef struct STsdbCfg {
typedef struct SMetaCfg {
} SMetaCfg;
typedef struct SSyncCluster {
int8_t replica;
int8_t quorum;
SNodeInfo nodes[TSDB_MAX_REPLICA];
} SSyncCfg;
typedef struct SVnodeCfg {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int8_t dropped;
SWalCfg wal;
STsdbCfg tsdb;
SMetaCfg meta;
SSyncCfg sync;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int8_t dropped;
int8_t quorum;
SWalCfg wal;
STsdbCfg tsdb;
SMetaCfg meta;
SSyncCluster sync;
} SVnodeCfg;
typedef struct {
......@@ -84,9 +79,9 @@ typedef struct {
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
twalh pWal;
SWal *pWal;
void *pQuery;
SyncNodeId syncNode;
SSyncNode *pSync;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
......
......@@ -30,149 +30,156 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) {
fp = fopen(file, "r");
if (!fp) {
vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", vgId, file, strerror(errno));
vError("vgId:%d, failed to open vnode cfg file:%s to read since %s", vgId, file, strerror(errno));
ret = TAOS_SYSTEM_ERROR(errno);
goto PARSE_VCFG_ERROR;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
vError("vgId:%d, failed to read %s, content is null", vgId, file);
vError("vgId:%d, failed to read %s since content is null", vgId, file);
goto PARSE_VCFG_ERROR;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read %s, invalid json format", vgId, file);
vError("vgId:%d, failed to read %s since invalid json format", vgId, file);
goto PARSE_VCFG_ERROR;
}
cJSON *db = cJSON_GetObjectItem(root, "db");
if (!db || db->type != cJSON_String || db->valuestring == NULL) {
vError("vgId:%d, failed to read %s, db not found", vgId, file);
vError("vgId:%d, failed to read %s since db not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
tstrncpy(pCfg->db, db->valuestring, sizeof(pCfg->db));
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, dropped not found", vgId, file);
vError("vgId:%d, failed to read %s since dropped not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->dropped = (int32_t)dropped->valueint;
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, quorum not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->quorum = (int8_t)quorum->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, cacheBlockSize not found", vgId, file);
vError("vgId:%d, failed to read %s since cacheBlockSize not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.cacheBlockSize = (int32_t)cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, totalBlocks not found", vgId, file);
vError("vgId:%d, failed to read %s since totalBlocks not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.totalBlocks = (int32_t)totalBlocks->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysPerFile not found", vgId, file);
vError("vgId:%d, failed to read %s since daysPerFile not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysPerFile = (int32_t)daysPerFile->valueint;
cJSON *daysToKeep0 = cJSON_GetObjectItem(root, "daysToKeep0");
if (!daysToKeep0 || daysToKeep0->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep0 not found", vgId, file);
vError("vgId:%d, failed to read %s since daysToKeep0 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep0 = (int32_t)daysToKeep0->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep1 not found", vgId, file);
vError("vgId:%d, failed to read %s since daysToKeep1 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep1 = (int32_t)daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep2 not found", vgId, file);
vError("vgId:%d, failed to read %s since daysToKeep2 not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.daysToKeep2 = (int32_t)daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", vgId, file);
vError("vgId:%d, failed to read %s since minRowsPerFileBlock not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint;
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", vgId, file);
vError("vgId:%d, failed to read %s since maxRowsPerFileBlock not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, precision not found", vgId, file);
vError("vgId:%d, failed to read %s since precision not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.precision = (int8_t)precision->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression");
if (!compression || compression->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, compression not found", vgId, file);
vError("vgId:%d, failed to read %s since compression not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.compression = (int8_t)compression->valueint;
cJSON *update = cJSON_GetObjectItem(root, "update");
if (!update || update->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, update not found", vgId, file);
vError("vgId: %d, failed to read %s since update not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.update = (int8_t)update->valueint;
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", vgId, file);
vError("vgId: %d, failed to read %s since cacheLastRow not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->tsdb.cacheLastRow = (int8_t)cacheLastRow->valueint;
cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, walLevel not found", vgId, file);
vError("vgId:%d, failed to read %s since walLevel not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->wal.walLevel = (int8_t)walLevel->valueint;
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, fsyncPeriod not found", vgId, file);
vError("vgId:%d, failed to read %s since fsyncPeriod not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->wal.fsyncPeriod = (int32_t)fsyncPeriod->valueint;
cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, replica not found", vgId, file);
cJSON *selfIndex = cJSON_GetObjectItem(root, "selfIndex");
if (!selfIndex || selfIndex->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since selfIndex not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->sync.replica = (int8_t)replica->valueint;
pCfg->sync.selfIndex = selfIndex->valueint;
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, quorum not found", vgId, file);
cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since replica not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
pCfg->sync.quorum = (int8_t)quorum->valueint;
pCfg->sync.replica = replica->valueint;
cJSON *nodes = cJSON_GetObjectItem(root, "nodes");
if (!nodes || nodes->type != cJSON_Array) {
......@@ -182,28 +189,35 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) {
int size = cJSON_GetArraySize(nodes);
if (size != pCfg->sync.replica) {
vError("vgId:%d, failed to read %s, nodes size not matched", vgId, file);
vError("vgId:%d, failed to read %s since nodes size not matched", vgId, file);
goto PARSE_VCFG_ERROR;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodes, i);
if (nodeInfo == NULL) continue;
SNodeInfo *node = &pCfg->sync.nodes[i];
SNodeInfo *node = &pCfg->sync.nodeInfo[i];
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "id");
if (!nodeId || nodeId->type != cJSON_Number) {
vError("vgId:%d, failed to read %s since nodeId not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
node->nodeId = nodeId->valueint;
cJSON *port = cJSON_GetObjectItem(nodeInfo, "port");
if (!port || port->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, port not found", vgId, file);
cJSON *nodePort = cJSON_GetObjectItem(nodeInfo, "port");
if (!nodePort || nodePort->type != cJSON_Number) {
vError("vgId:%d, failed to read %s sincenodePort not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
node->nodePort = (uint16_t)port->valueint;
node->nodePort = (uint16_t)nodePort->valueint;
cJSON *fqdn = cJSON_GetObjectItem(nodeInfo, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
vError("vgId:%d, failed to read %s, fqdn not found", vgId, file);
cJSON *nodeFqdn = cJSON_GetObjectItem(nodeInfo, "fqdn");
if (!nodeFqdn || nodeFqdn->type != cJSON_String || nodeFqdn->valuestring == NULL) {
vError("vgId:%d, failed to read %s since nodeFqdn not found", vgId, file);
goto PARSE_VCFG_ERROR;
}
tstrncpy(node->nodeFqdn, fqdn->valuestring, TSDB_FQDN_LEN);
tstrncpy(node->nodeFqdn, nodeFqdn->valuestring, TSDB_FQDN_LEN);
}
ret = TSDB_CODE_SUCCESS;
......@@ -238,6 +252,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", vgId);
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pCfg->db);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pCfg->dropped);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->quorum);
// tsdb
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pCfg->tsdb.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pCfg->tsdb.totalBlocks);
......@@ -255,11 +270,12 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pCfg->wal.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsyncPeriod\": %d,\n", pCfg->wal.fsyncPeriod);
// sync
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->sync.quorum);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pCfg->sync.replica);
len += snprintf(content + len, maxLen - len, " \"selfIndex\": %d,\n", pCfg->sync.selfIndex);
len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n");
for (int32_t i = 0; i < pCfg->sync.replica; i++) {
SNodeInfo *node = &pCfg->sync.nodes[i];
SNodeInfo *node = &pCfg->sync.nodeInfo[i];
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", node->nodeId);
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", node->nodePort);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\"\n", node->nodeFqdn);
if (i < pCfg->sync.replica - 1) {
......@@ -280,7 +296,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState) {
int32_t ret = TSDB_CODE_VND_APP_ERROR;
int32_t len = 0;
int32_t maxLen = 100;
......@@ -289,7 +305,7 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
FILE *fp = NULL;
char file[PATH_MAX + 30] = {0};
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId);
sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId);
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
......@@ -304,20 +320,20 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
}
cJSON *term = cJSON_GetObjectItem(root, "term");
if (!term || term->type != cJSON_Number) {
if (!term || term->type != cJSON_String) {
vError("vgId:%d, failed to read %s since term not found", vgId, file);
goto PARSE_TERM_ERROR;
}
pState->term = (uint64_t)term->valueint;
pState->term = atoll(term->valuestring);
cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor");
if (!voteFor || voteFor->type != cJSON_Number) {
if (!voteFor || voteFor->type != cJSON_String) {
vError("vgId:%d, failed to read %s since voteFor not found", vgId, file);
goto PARSE_TERM_ERROR;
}
pState->voteFor = (int64_t)voteFor->valueint;
pState->voteFor = atoi(voteFor->valuestring);
vInfo("vgId:%d, read %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
vInfo("vgId:%d, read %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
PARSE_TERM_ERROR:
if (content != NULL) free(content);
......@@ -327,9 +343,9 @@ PARSE_TERM_ERROR:
return ret;
}
int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) {
int32_t vnodeSaveState(int32_t vgId, SSyncServerState *pState) {
char file[PATH_MAX + 30] = {0};
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId);
sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId);
FILE *fp = fopen(file, "w");
if (!fp) {
......@@ -342,8 +358,8 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) {
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"term\": %" PRIu64 "\n", pState->term);
len += snprintf(content + len, maxLen - len, " \"voteFor\": %" PRIu64 "\n", pState->voteFor);
len += snprintf(content + len, maxLen - len, " \"term\": \"%" PRIu64 "\",\n", pState->term);
len += snprintf(content + len, maxLen - len, " \"voteFor\": \"%d\"\n", pState->voteFor);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
......@@ -351,6 +367,6 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) {
fclose(fp);
free(content);
vInfo("vgId:%d, write %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
vInfo("vgId:%d, write %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -108,6 +108,11 @@ static void vnodeDestroyVnode(SVnode *pVnode) {
int32_t code = 0;
int32_t vgId = pVnode->vgId;
if (pVnode->pSync != NULL) {
syncStop(pVnode->pSync);
pVnode->pSync = NULL;
}
if (pVnode->pQuery) {
// todo
}
......@@ -125,7 +130,8 @@ static void vnodeDestroyVnode(SVnode *pVnode) {
}
if (pVnode->pWal) {
// todo
walClose(pVnode->pWal);
pVnode->pWal = NULL;
}
if (pVnode->allocator) {
......@@ -161,6 +167,56 @@ static void vnodeCleanupVnode(SVnode *pVnode) {
vnodeRelease(pVnode);
}
static inline int32_t vnodeLogWrite(struct SSyncLogStore *logStore, SyncIndex index, SSyncBuffer *pBuf) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walWrite(pVnode->pWal, index, pBuf->data, (int32_t)pBuf->len);
}
static inline int32_t vnodeLogCommit(struct SSyncLogStore *logStore, SyncIndex index) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walCommit(pVnode->pWal, index);
}
static inline int32_t vnodeLogPrune(struct SSyncLogStore *logStore, SyncIndex index) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walPrune(pVnode->pWal, index);
}
static inline int32_t vnodeLogRollback(struct SSyncLogStore *logStore, SyncIndex index) {
SVnode *pVnode = logStore->pData; // vnode status can be checked here
return walRollback(pVnode->pWal, index);
}
static inline int32_t vnodeSaveServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
SVnode *pVnode = stateMng->pData;
return vnodeSaveState(pVnode->vgId, pState);
}
static inline int32_t vnodeReadServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
SVnode *pVnode = stateMng->pData;
return vnodeSaveState(pVnode->vgId, pState);
}
static inline int32_t vnodeApplyLog(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
return 0;
}
static inline int32_t vnodeOnClusterChanged(struct SSyncFSM *fsm, const SSyncCluster *cluster, void *pData) { return 0; }
static inline int32_t vnodeGetSnapshot(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int32_t *objId, bool *isLast) {
return 0;
}
static inline int32_t vnodeApplySnapshot(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int32_t objId, bool isLast) {
return 0;
}
static inline int32_t vnodeOnRestoreDone(struct SSyncFSM *fsm) { return 0; }
static inline void vnodeOnRollback(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf) {}
static inline void vnodeOnRoleChanged(struct SSyncFSM *fsm, const SNodesRole *pRole) {}
static int32_t vnodeOpenVnode(int32_t vgId) {
int32_t code = 0;
......@@ -177,6 +233,9 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
pVnode->role = TAOS_SYNC_ROLE_CANDIDATE;
pthread_mutex_init(&pVnode->statusMutex, NULL);
vDebug("vgId:%d, vnode is opened", pVnode->vgId);
taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *));
code = vnodeReadCfg(vgId, &pVnode->cfg);
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId);
......@@ -185,7 +244,7 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
return 0;
}
code = vnodeReadTerm(vgId, &pVnode->term);
code = vnodeSaveState(vgId, &pVnode->term);
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
pVnode->cfg.dropped = 1;
......@@ -209,8 +268,33 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
return terrno;
}
vDebug("vgId:%d, vnode is opened", pVnode->vgId);
taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *));
// create sync node
SSyncInfo syncInfo = {0};
syncInfo.vgId = vgId;
syncInfo.snapshotIndex = 0; // todo, from tsdb
memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster));
syncInfo.fsm.pData = pVnode;
syncInfo.fsm.applyLog = vnodeApplyLog;
syncInfo.fsm.onClusterChanged = vnodeOnClusterChanged;
syncInfo.fsm.getSnapshot = vnodeGetSnapshot;
syncInfo.fsm.applySnapshot = vnodeApplySnapshot;
syncInfo.fsm.onRestoreDone = vnodeOnRestoreDone;
syncInfo.fsm.onRollback = vnodeOnRollback;
syncInfo.fsm.onRoleChanged = vnodeOnRoleChanged;
syncInfo.logStore.pData = pVnode;
syncInfo.logStore.logWrite = vnodeLogWrite;
syncInfo.logStore.logCommit = vnodeLogCommit;
syncInfo.logStore.logPrune = vnodeLogPrune;
syncInfo.logStore.logRollback = vnodeLogRollback;
syncInfo.stateManager.pData = pVnode;
syncInfo.stateManager.saveServerState = vnodeSaveServerState;
syncInfo.stateManager.readServerState = vnodeReadServerState;
pVnode->pSync = syncStart(&syncInfo);
if (pVnode->pSync == NULL) {
vnodeCleanupVnode(pVnode);
return terrno;
}
vnodeSetReadyStatus(pVnode);
return TSDB_CODE_SUCCESS;
......@@ -313,7 +397,7 @@ int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) {
}
if (syncChanged) {
// todo
syncReconfig(pVnode->pSync, &pVnode->cfg.sync);
}
vnodeRelease(pVnode);
......
......@@ -31,6 +31,7 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf
*vgId = htonl(pCreate->vgId);
pCfg->dropped = 0;
pCfg->quorum = pCreate->quorum;
tstrncpy(pCfg->db, pCreate->db, sizeof(pCfg->db));
pCfg->tsdb.cacheBlockSize = htonl(pCreate->cacheBlockSize);
......@@ -50,11 +51,11 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf
pCfg->wal.walLevel = pCreate->walLevel;
pCfg->sync.replica = pCreate->replica;
pCfg->sync.quorum = pCreate->quorum;
pCfg->sync.selfIndex = pCreate->selfIndex;
for (int32_t j = 0; j < pCreate->replica; ++j) {
pCfg->sync.nodes[j].nodePort = htons(pCreate->nodes[j].port);
tstrncpy(pCfg->sync.nodes[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN);
pCfg->sync.nodeInfo[j].nodePort = htons(pCreate->nodes[j].port);
tstrncpy(pCfg->sync.nodeInfo[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN);
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册