diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 50594fac0055d4f1847bdf7cd98ec47f383364c5..d571153c1a03d0253522e2c5a3fefeeffb216022 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -721,6 +721,8 @@ typedef struct { int32_t daysToKeep2; int32_t minRowsPerFileBlock; int32_t maxRowsPerFileBlock; + int32_t fsyncPeriod; + int8_t reserved[16]; int8_t precision; int8_t compression; int8_t cacheLastRow; @@ -728,8 +730,7 @@ typedef struct { int8_t walLevel; int8_t replica; int8_t quorum; - int8_t reserved[9]; - int32_t fsyncPeriod; + int8_t selfIndex; SVnodeDesc nodes[TSDB_MAX_REPLICA]; } SCreateVnodeMsg, SAlterVnodeMsg; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 30583686c5e31c276e0761369cf7c8b0106947c2..e8a8dee866507b550a9e010c516dce52709e3b2d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -23,9 +23,9 @@ extern "C" { #include #include "taosdef.h" -typedef int64_t SyncNodeId; -typedef int32_t SyncGroupId; -typedef int64_t SyncIndex; +typedef int32_t SyncNodeId; +typedef int32_t SyncGroupId; +typedef int64_t SyncIndex; typedef uint64_t SSyncTerm; typedef enum { @@ -41,21 +41,21 @@ typedef struct { typedef struct { SyncNodeId nodeId; - uint16_t nodePort; // node sync Port - char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; typedef struct { - int selfIndex; - int nNode; - SNodeInfo* nodeInfo; + int selfIndex; + int replica; + SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCluster; typedef struct { - int32_t selfIndex; - int nNode; - SNodeInfo* node; - ESyncRole* role; + int32_t selfIndex; + int replica; + SNodeInfo node[TSDB_MAX_REPLICA]; + ESyncRole role[TSDB_MAX_REPLICA]; } SNodesRole; typedef struct SSyncFSM { @@ -101,13 +101,13 @@ typedef struct SSyncLogStore { typedef struct SSyncServerState { SyncNodeId voteFor; - SSyncTerm term; + SSyncTerm term; } SSyncServerState; typedef struct SSyncClusterConfig { // Log index number of current cluster config. SyncIndex index; - + // Log index number of previous cluster config. SyncIndex prevIndex; @@ -122,21 +122,17 @@ typedef struct SStateManager { const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); - void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); + // void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); - const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); + // const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); } SStateManager; typedef struct { - SyncGroupId vgId; - - SyncIndex snapshotIndex; - SSyncCluster syncCfg; - - SSyncFSM fsm; - + SyncGroupId vgId; + SyncIndex snapshotIndex; + SSyncCluster syncCfg; + SSyncFSM fsm; SSyncLogStore logStore; - SStateManager stateManager; } SSyncInfo; @@ -146,19 +142,20 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -SSyncNode syncStart(const SSyncInfo*); -void syncStop(SyncNodeId); +SSyncNode* syncStart(const SSyncInfo*); +void syncReconfig(const SSyncNode*, const SSyncCluster*); +void syncStop(const SSyncNode*); -int32_t syncPropose(SSyncNode syncNode, SSyncBuffer buffer, void* pData, bool isWeak); +int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak); -int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); +//int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); -int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); +//int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); -extern int32_t syncDebugFlag; +extern int32_t syncDebugFlag; #ifdef __cplusplus } #endif -#endif /*_TD_LIBS_SYNC_H*/ +#endif /*_TD_LIBS_SYNC_H*/ diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index 4b3ca11e4b00039e533ca4a53326f83aa39d6dbd..879f2d4f6d800d859e344bd5b42651fdc92c8288 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -15,5 +15,12 @@ #include "sync.h" -int32_t syncInit() {return 0;} -void syncCleanUp() {} \ No newline at end of file +int32_t syncInit() { return 0; } + +void syncCleanUp() {} + +SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; } + +void syncStop(const SSyncNode* pNode) {} + +void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} \ No newline at end of file diff --git a/source/server/mnode/inc/mnodeInt.h b/source/server/mnode/inc/mnodeInt.h index 42d3c53fa2a5242e3b663032aea5090dd05e6172..0ce47cbe36f314a50ee0bef3e91b0411958b23a8 100644 --- a/source/server/mnode/inc/mnodeInt.h +++ b/source/server/mnode/inc/mnodeInt.h @@ -24,7 +24,7 @@ extern "C" { tmr_h mnodeGetTimer(); int32_t mnodeGetDnodeId(); -char *mnodeGetClusterId(); +int64_t mnodeGetClusterId(); EMnStatus mnodeGetStatus(); void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); diff --git a/source/server/mnode/src/mnodeTelem.c b/source/server/mnode/src/mnodeTelem.c index cb292342c7e86d751ce3fa73f38ce0eb396f76ec..8b8e4f9ce0931637fb91f9798fd2e79d42d9729d 100644 --- a/source/server/mnode/src/mnodeTelem.c +++ b/source/server/mnode/src/mnodeTelem.c @@ -202,12 +202,13 @@ static void mnodeSendTelemetryReport() { return; } - char clusterId[TSDB_CLUSTER_ID_LEN] = {0}; - mnodeGetClusterId(clusterId); + int64_t clusterId = mnodeGetClusterId(); + char clusterIdStr[20] = {0}; + snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId); SBufferWriter bw = tbufInitWriter(NULL, false); mnodeBeginObject(&bw); - mnodeAddStringField(&bw, "instanceId", clusterId); + mnodeAddStringField(&bw, "instanceId", clusterIdStr); mnodeAddIntField(&bw, "reportVersion", 1); mnodeAddOsInfo(&bw); mnodeAddCpuInfo(&bw); diff --git a/source/server/mnode/src/mondeInt.c b/source/server/mnode/src/mondeInt.c index 37af26f6042830c3320b2bdd1120db08d92cb374..343384ba6742314000bb57818b6049e9be536dad 100644 --- a/source/server/mnode/src/mondeInt.c +++ b/source/server/mnode/src/mondeInt.c @@ -39,7 +39,7 @@ static struct { int32_t state; int32_t dnodeId; - char clusterId[TSDB_CLUSTER_ID_LEN]; + int64_t clusterId; tmr_h timer; SMnodeFp fp; SSteps * steps1; @@ -50,7 +50,7 @@ tmr_h mnodeGetTimer() { return tsMint.timer; } int32_t mnodeGetDnodeId() { return tsMint.dnodeId; } -char *mnodeGetClusterId() { return tsMint.clusterId; } +int64_t mnodeGetClusterId() { return tsMint.clusterId; } EMnStatus mnodeGetStatus() { return tsMint.state; } @@ -71,12 +71,14 @@ int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; } static int32_t mnodeSetPara(SMnodePara para) { tsMint.fp = para.fp; tsMint.dnodeId = para.dnodeId; - strncpy(tsMint.clusterId, para.clusterId, TSDB_CLUSTER_ID_LEN); + tsMint.clusterId = para.clusterId; if (tsMint.fp.SendMsgToDnode == NULL) return -1; if (tsMint.fp.SendMsgToMnode == NULL) return -1; if (tsMint.fp.SendRedirectMsg == NULL) return -1; + if (tsMint.fp.GetDnodeEp == NULL) return -1; if (tsMint.dnodeId < 0) return -1; + if (tsMint.clusterId < 0) return -1; return 0; } @@ -141,7 +143,7 @@ static void mnodeCleanupStep2() { taosStepCleanup(tsMint.steps2); } static bool mnodeNeedDeploy() { if (tsMint.dnodeId > 0) return false; - if (tsMint.clusterId[0] != 0) return false; + if (tsMint.clusterId > 0) return false; if (strcmp(tsFirst, tsLocalEp) != 0) return false; return true; } @@ -154,7 +156,7 @@ int32_t mnodeDeploy() { tsMint.state = MN_STATUS_INIT; } - if (tsMint.dnodeId <= 0 || tsMint.clusterId[0] == 0) { + if (tsMint.dnodeId <= 0 || tsMint.clusterId <= 0) { mError("failed to deploy mnode since cluster not ready"); return TSDB_CODE_MND_NOT_READY; } diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index 3c7487f68149e56727102e95c634b6821c534b81..ac6c77041f42f2dda3b5f4898a18a67136183aa5 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -62,19 +62,14 @@ typedef struct STsdbCfg { typedef struct SMetaCfg { } SMetaCfg; -typedef struct SSyncCluster { - int8_t replica; - int8_t quorum; - SNodeInfo nodes[TSDB_MAX_REPLICA]; -} SSyncCfg; - typedef struct SVnodeCfg { - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - int8_t dropped; - SWalCfg wal; - STsdbCfg tsdb; - SMetaCfg meta; - SSyncCfg sync; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + int8_t dropped; + int8_t quorum; + SWalCfg wal; + STsdbCfg tsdb; + SMetaCfg meta; + SSyncCluster sync; } SVnodeCfg; typedef struct { @@ -86,7 +81,7 @@ typedef struct { STQ *pTQ; twalh pWal; void *pQuery; - SyncNodeId syncNode; + SSyncNode *pSync; taos_queue pWriteQ; // write queue taos_queue pQueryQ; // read query queue taos_queue pFetchQ; // read fetch/cancel queue diff --git a/source/server/vnode/src/vnodeFile.c b/source/server/vnode/src/vnodeFile.c index 9835e3e0fbcb8e5604a612bc7f2832bbb326538f..a77c99ec3442da097c1ec5abe641a6773eeda12e 100644 --- a/source/server/vnode/src/vnodeFile.c +++ b/source/server/vnode/src/vnodeFile.c @@ -30,149 +30,156 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) { fp = fopen(file, "r"); if (!fp) { - vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", vgId, file, strerror(errno)); + vError("vgId:%d, failed to open vnode cfg file:%s to read since %s", vgId, file, strerror(errno)); ret = TAOS_SYSTEM_ERROR(errno); goto PARSE_VCFG_ERROR; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - vError("vgId:%d, failed to read %s, content is null", vgId, file); + vError("vgId:%d, failed to read %s since content is null", vgId, file); goto PARSE_VCFG_ERROR; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - vError("vgId:%d, failed to read %s, invalid json format", vgId, file); + vError("vgId:%d, failed to read %s since invalid json format", vgId, file); goto PARSE_VCFG_ERROR; } cJSON *db = cJSON_GetObjectItem(root, "db"); if (!db || db->type != cJSON_String || db->valuestring == NULL) { - vError("vgId:%d, failed to read %s, db not found", vgId, file); + vError("vgId:%d, failed to read %s since db not found", vgId, file); goto PARSE_VCFG_ERROR; } tstrncpy(pCfg->db, db->valuestring, sizeof(pCfg->db)); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, dropped not found", vgId, file); + vError("vgId:%d, failed to read %s since dropped not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->dropped = (int32_t)dropped->valueint; + cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); + if (!quorum || quorum->type != cJSON_Number) { + vError("vgId: %d, failed to read %s, quorum not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + pCfg->quorum = (int8_t)quorum->valueint; + cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, cacheBlockSize not found", vgId, file); + vError("vgId:%d, failed to read %s since cacheBlockSize not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.cacheBlockSize = (int32_t)cacheBlockSize->valueint; cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); if (!totalBlocks || totalBlocks->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, totalBlocks not found", vgId, file); + vError("vgId:%d, failed to read %s since totalBlocks not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.totalBlocks = (int32_t)totalBlocks->valueint; cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); if (!daysPerFile || daysPerFile->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysPerFile not found", vgId, file); + vError("vgId:%d, failed to read %s since daysPerFile not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.daysPerFile = (int32_t)daysPerFile->valueint; cJSON *daysToKeep0 = cJSON_GetObjectItem(root, "daysToKeep0"); if (!daysToKeep0 || daysToKeep0->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysToKeep0 not found", vgId, file); + vError("vgId:%d, failed to read %s since daysToKeep0 not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.daysToKeep0 = (int32_t)daysToKeep0->valueint; cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysToKeep1 not found", vgId, file); + vError("vgId:%d, failed to read %s since daysToKeep1 not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.daysToKeep1 = (int32_t)daysToKeep1->valueint; cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, daysToKeep2 not found", vgId, file); + vError("vgId:%d, failed to read %s since daysToKeep2 not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.daysToKeep2 = (int32_t)daysToKeep2->valueint; cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", vgId, file); + vError("vgId:%d, failed to read %s since minRowsPerFileBlock not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint; cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", vgId, file); + vError("vgId:%d, failed to read %s since maxRowsPerFileBlock not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint; cJSON *precision = cJSON_GetObjectItem(root, "precision"); if (!precision || precision->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, precision not found", vgId, file); + vError("vgId:%d, failed to read %s since precision not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.precision = (int8_t)precision->valueint; cJSON *compression = cJSON_GetObjectItem(root, "compression"); if (!compression || compression->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, compression not found", vgId, file); + vError("vgId:%d, failed to read %s since compression not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.compression = (int8_t)compression->valueint; cJSON *update = cJSON_GetObjectItem(root, "update"); if (!update || update->type != cJSON_Number) { - vError("vgId: %d, failed to read %s, update not found", vgId, file); + vError("vgId: %d, failed to read %s since update not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.update = (int8_t)update->valueint; cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { - vError("vgId: %d, failed to read %s, cacheLastRow not found", vgId, file); + vError("vgId: %d, failed to read %s since cacheLastRow not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->tsdb.cacheLastRow = (int8_t)cacheLastRow->valueint; cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel"); if (!walLevel || walLevel->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, walLevel not found", vgId, file); + vError("vgId:%d, failed to read %s since walLevel not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->wal.walLevel = (int8_t)walLevel->valueint; cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsyncPeriod"); if (!walLevel || walLevel->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, fsyncPeriod not found", vgId, file); + vError("vgId:%d, failed to read %s since fsyncPeriod not found", vgId, file); goto PARSE_VCFG_ERROR; } pCfg->wal.fsyncPeriod = (int32_t)fsyncPeriod->valueint; - cJSON *replica = cJSON_GetObjectItem(root, "replica"); - if (!replica || replica->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, replica not found", vgId, file); + cJSON *selfIndex = cJSON_GetObjectItem(root, "selfIndex"); + if (!selfIndex || selfIndex->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since selfIndex not found", vgId, file); goto PARSE_VCFG_ERROR; } - pCfg->sync.replica = (int8_t)replica->valueint; + pCfg->sync.selfIndex = selfIndex->valueint; - cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); - if (!quorum || quorum->type != cJSON_Number) { - vError("vgId: %d, failed to read %s, quorum not found", vgId, file); + cJSON *replica = cJSON_GetObjectItem(root, "replica"); + if (!replica || replica->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since replica not found", vgId, file); goto PARSE_VCFG_ERROR; } - pCfg->sync.quorum = (int8_t)quorum->valueint; + pCfg->sync.replica = replica->valueint; cJSON *nodes = cJSON_GetObjectItem(root, "nodes"); if (!nodes || nodes->type != cJSON_Array) { @@ -182,28 +189,35 @@ int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg) { int size = cJSON_GetArraySize(nodes); if (size != pCfg->sync.replica) { - vError("vgId:%d, failed to read %s, nodes size not matched", vgId, file); + vError("vgId:%d, failed to read %s since nodes size not matched", vgId, file); goto PARSE_VCFG_ERROR; } for (int i = 0; i < size; ++i) { cJSON *nodeInfo = cJSON_GetArrayItem(nodes, i); if (nodeInfo == NULL) continue; - SNodeInfo *node = &pCfg->sync.nodes[i]; + SNodeInfo *node = &pCfg->sync.nodeInfo[i]; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "id"); + if (!nodeId || nodeId->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since nodeId not found", vgId, file); + goto PARSE_VCFG_ERROR; + } + node->nodeId = nodeId->valueint; - cJSON *port = cJSON_GetObjectItem(nodeInfo, "port"); - if (!port || port->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, port not found", vgId, file); + cJSON *nodePort = cJSON_GetObjectItem(nodeInfo, "port"); + if (!nodePort || nodePort->type != cJSON_Number) { + vError("vgId:%d, failed to read %s sincenodePort not found", vgId, file); goto PARSE_VCFG_ERROR; } - node->nodePort = (uint16_t)port->valueint; + node->nodePort = (uint16_t)nodePort->valueint; - cJSON *fqdn = cJSON_GetObjectItem(nodeInfo, "fqdn"); - if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - vError("vgId:%d, failed to read %s, fqdn not found", vgId, file); + cJSON *nodeFqdn = cJSON_GetObjectItem(nodeInfo, "fqdn"); + if (!nodeFqdn || nodeFqdn->type != cJSON_String || nodeFqdn->valuestring == NULL) { + vError("vgId:%d, failed to read %s since nodeFqdn not found", vgId, file); goto PARSE_VCFG_ERROR; } - tstrncpy(node->nodeFqdn, fqdn->valuestring, TSDB_FQDN_LEN); + tstrncpy(node->nodeFqdn, nodeFqdn->valuestring, TSDB_FQDN_LEN); } ret = TSDB_CODE_SUCCESS; @@ -238,6 +252,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", vgId); len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pCfg->db); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pCfg->dropped); + len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->quorum); // tsdb len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pCfg->tsdb.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pCfg->tsdb.totalBlocks); @@ -255,11 +270,12 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pCfg->wal.walLevel); len += snprintf(content + len, maxLen - len, " \"fsyncPeriod\": %d,\n", pCfg->wal.fsyncPeriod); // sync - len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pCfg->sync.quorum); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pCfg->sync.replica); + len += snprintf(content + len, maxLen - len, " \"selfIndex\": %d,\n", pCfg->sync.selfIndex); len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n"); for (int32_t i = 0; i < pCfg->sync.replica; i++) { - SNodeInfo *node = &pCfg->sync.nodes[i]; + SNodeInfo *node = &pCfg->sync.nodeInfo[i]; + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", node->nodeId); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", node->nodePort); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\"\n", node->nodeFqdn); if (i < pCfg->sync.replica - 1) { @@ -304,20 +320,20 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { } cJSON *term = cJSON_GetObjectItem(root, "term"); - if (!term || term->type != cJSON_Number) { + if (!term || term->type != cJSON_String) { vError("vgId:%d, failed to read %s since term not found", vgId, file); goto PARSE_TERM_ERROR; } - pState->term = (uint64_t)term->valueint; + pState->term = atoll(term->valuestring); cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor"); - if (!voteFor || voteFor->type != cJSON_Number) { + if (!voteFor || voteFor->type != cJSON_String) { vError("vgId:%d, failed to read %s since voteFor not found", vgId, file); goto PARSE_TERM_ERROR; } - pState->voteFor = (int64_t)voteFor->valueint; + pState->voteFor = atoi(voteFor->valuestring); - vInfo("vgId:%d, read %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); + vInfo("vgId:%d, read %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term); PARSE_TERM_ERROR: if (content != NULL) free(content); @@ -342,8 +358,8 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"term\": %" PRIu64 "\n", pState->term); - len += snprintf(content + len, maxLen - len, " \"voteFor\": %" PRIu64 "\n", pState->voteFor); + len += snprintf(content + len, maxLen - len, " \"term\": \"%" PRIu64 "\",\n", pState->term); + len += snprintf(content + len, maxLen - len, " \"voteFor\": \"%d\"\n", pState->voteFor); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -351,6 +367,6 @@ int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { fclose(fp); free(content); - vInfo("vgId:%d, write %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); + vInfo("vgId:%d, write %s success, voteFor:%d, term:%" PRIu64, vgId, file, pState->voteFor, pState->term); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index 5143f04c5b4b18f0fcfc2e0ce439b9f5a5b05734..c08ae7708a1a5b7179879d6830d7550528aae6d5 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -108,6 +108,11 @@ static void vnodeDestroyVnode(SVnode *pVnode) { int32_t code = 0; int32_t vgId = pVnode->vgId; + if (pVnode->pSync != NULL) { + syncStop(pVnode->pSync); + pVnode->pSync = NULL; + } + if (pVnode->pQuery) { // todo } @@ -177,6 +182,9 @@ static int32_t vnodeOpenVnode(int32_t vgId) { pVnode->role = TAOS_SYNC_ROLE_CANDIDATE; pthread_mutex_init(&pVnode->statusMutex, NULL); + vDebug("vgId:%d, vnode is opened", pVnode->vgId); + taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *)); + code = vnodeReadCfg(vgId, &pVnode->cfg); if (code != TSDB_CODE_SUCCESS) { vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId); @@ -209,8 +217,34 @@ static int32_t vnodeOpenVnode(int32_t vgId) { return terrno; } - vDebug("vgId:%d, vnode is opened", pVnode->vgId); - taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *)); + // create sync node + SSyncInfo syncInfo = {0}; + syncInfo.vgId = vgId; + syncInfo.snapshotIndex = 0; // todo, from tsdb + memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster)); + syncInfo.fsm.pData = pVnode; + syncInfo.fsm.applyLog = NULL; + syncInfo.fsm.onClusterChanged = NULL; + syncInfo.fsm.getSnapshot = NULL; + syncInfo.fsm.applySnapshot = NULL; + syncInfo.fsm.onRestoreDone = NULL; + syncInfo.fsm.onRollback = NULL; + syncInfo.logStore.pData = pVnode; + syncInfo.logStore.logWrite = NULL; + syncInfo.logStore.logCommit = NULL; + syncInfo.logStore.logPrune = NULL; + syncInfo.logStore.logRollback = NULL; + syncInfo.stateManager.pData = pVnode; + syncInfo.stateManager.saveServerState = NULL; + syncInfo.stateManager.readServerState = NULL; + // syncInfo.stateManager.saveCluster = NULL; + // syncInfo.stateManager.readCluster = NULL; + + pVnode->pSync = syncStart(&syncInfo); + if (pVnode->pSync == NULL) { + vnodeCleanupVnode(pVnode); + return terrno; + } vnodeSetReadyStatus(pVnode); return TSDB_CODE_SUCCESS; @@ -313,7 +347,7 @@ int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) { } if (syncChanged) { - // todo + syncReconfig(pVnode->pSync, &pVnode->cfg.sync); } vnodeRelease(pVnode); diff --git a/source/server/vnode/src/vnodeMgmt.c b/source/server/vnode/src/vnodeMgmt.c index d20e36641e2eb8cdda9b48f8fca2a32ab9850183..e0e76d5b566d00122e0e5dd81dcbcddab2774aff 100644 --- a/source/server/vnode/src/vnodeMgmt.c +++ b/source/server/vnode/src/vnodeMgmt.c @@ -31,6 +31,7 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf *vgId = htonl(pCreate->vgId); pCfg->dropped = 0; + pCfg->quorum = pCreate->quorum; tstrncpy(pCfg->db, pCreate->db, sizeof(pCfg->db)); pCfg->tsdb.cacheBlockSize = htonl(pCreate->cacheBlockSize); @@ -50,11 +51,11 @@ static int32_t vnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCf pCfg->wal.walLevel = pCreate->walLevel; pCfg->sync.replica = pCreate->replica; - pCfg->sync.quorum = pCreate->quorum; - + pCfg->sync.selfIndex = pCreate->selfIndex; + for (int32_t j = 0; j < pCreate->replica; ++j) { - pCfg->sync.nodes[j].nodePort = htons(pCreate->nodes[j].port); - tstrncpy(pCfg->sync.nodes[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN); + pCfg->sync.nodeInfo[j].nodePort = htons(pCreate->nodes[j].port); + tstrncpy(pCfg->sync.nodeInfo[j].nodeFqdn, pCreate->nodes[j].fqdn, TSDB_FQDN_LEN); } return 0;