diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 78f4d076fc2e167510339d0f1669176afeacf271..ee805a2a0c1aa53a90a7685ba58f798843a2ae67 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -298,7 +298,7 @@ static bool dnodeReadMnodeInfos() { tsMnodeInfos.nodeInfos[i].syncPort = (uint16_t)syncPort->valueint; cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName"); - if (!nodeIp || nodeName->type != cJSON_String || nodeName->valuestring == NULL) { + if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) { dError("failed to read mnode mgmtIpList.json, nodeName not found"); goto PARSE_OVER; } @@ -310,7 +310,7 @@ static bool dnodeReadMnodeInfos() { dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId, - taosIpStr(tsMnodeInfos.nodeInfos[i].nodeId), tsMnodeInfos.nodeInfos[i].nodePort, + taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp), tsMnodeInfos.nodeInfos[i].nodePort, tsMnodeInfos.nodeInfos[i].nodeName); } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 7054cb8cd0c44e21befedb88a99a373ae6b36bd7..99209c734c72cca7a3e9c2d12d22dac1f1706fe9 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -129,22 +129,21 @@ static void dnodeCloseVnodes() { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; - pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); - pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); - pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize); - pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1); - pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2); - pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); - pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); - pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock); - pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable); - pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks); - + pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); + pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables); + pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize); + pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock); + pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock); + pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1); + pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2); + pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); + pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); + pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp); + for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { - pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId); - pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId); - pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip); + pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); + pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp); } return vnodeCreate(pCreate); @@ -159,9 +158,22 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; - pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); - pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); - pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); + pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables); + pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize); + pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock); + pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock); + pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1); + pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2); + pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); + pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); + pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp); + + for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { + pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); + pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp); + } return 0; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index ef7cbc8eabe09616c650cd0ce6ae84288dc0202b..c3d745c7acf862ed79706d2dfa52d5ceb1c68ae0 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -245,12 +245,6 @@ typedef struct SSchema { int16_t bytes; } SSchema; -typedef struct { - int32_t vgId; - int32_t dnodeId; - uint32_t ip; -} SVnodeDesc; - typedef struct { int32_t contLen; int32_t vgId; @@ -521,9 +515,6 @@ typedef struct { uint8_t reserved[5]; } SVnodeLoad; -/* - * NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4 - */ typedef struct { char acct[TSDB_USER_LEN + 1]; char db[TSDB_DB_NAME_LEN + 1]; @@ -548,7 +539,7 @@ typedef struct { int8_t loadLatest; // load into mem or not uint8_t precision; // time resolution int8_t reserved[16]; -} SVnodeCfg, SDbCfg, SCMCreateDbMsg, SCMAlterDbMsg; +} SDbCfg, SCMCreateDbMsg, SCMAlterDbMsg; typedef struct { char db[TSDB_TABLE_ID_LEN + 1]; @@ -614,8 +605,35 @@ typedef struct { } SDMStatusRsp; typedef struct { - SVnodeCfg cfg; - SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; + uint32_t vgId; + int32_t maxTables; + int64_t maxCacheSize; + int32_t minRowsPerFileBlock; + int32_t maxRowsPerFileBlock; + int32_t daysPerFile; + int32_t daysToKeep; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t commitTime; + uint8_t precision; // time resolution + int8_t compression; + int8_t wals; + int8_t commitLog; + int8_t replications; + int8_t quorum; + uint32_t arbitratorIp; + int8_t reserved[16]; +} SMDVnodeCfg; + +typedef struct { + int32_t nodeId; + uint32_t nodeIp; + char nodeName[TSDB_NODE_NAME_LEN + 1]; +} SMDVnodeDesc; + +typedef struct { + SMDVnodeCfg cfg; + SMDVnodeDesc nodes[TSDB_MAX_MPEERS]; } SMDCreateVnodeMsg; typedef struct { @@ -673,9 +691,16 @@ typedef struct { int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM]; } SSuperTableMetaMsg; +typedef struct { + int32_t nodeId; + uint32_t nodeIp; + uint16_t nodePort; +} SVnodeDesc; + typedef struct { SVnodeDesc vpeerDesc[TSDB_REPLICA_MAX_NUM]; int16_t index; // used locally + int32_t vgId; int32_t numOfSids; int32_t pSidExtInfoList[]; // offset value of STableIdInfo } SVnodeSidList; diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 145a56130ffef7cfd1581f564bba0998b40c2b9a..dd38040147b1d71c8ea792415c6cee661963b9d6 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -444,7 +444,7 @@ static int32_t mgmtDropDnodeByIp(uint32_t ip) { return TSDB_CODE_NO_REMOVE_MASTER; } -#ifndef _VPEER +#ifndef _SYNC return mgmtDropDnode(pDnode); #else return balanceDropDnode(pDnode); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index eca052ba3470ee7ecf539db6cc876f03f04ada04..3ce4f41a5160de500fdf0edd5464afb46f528457 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -506,27 +506,37 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); if (pVnode == NULL) return NULL; - pVnode->cfg = pDb->cfg; - - SVnodeCfg *pCfg = &pVnode->cfg; - pCfg->vgId = htonl(pVgroup->vgId); - pCfg->maxSessions = htonl(pCfg->maxSessions); - pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize); - pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks); - pCfg->daysPerFile = htonl(pCfg->daysPerFile); - pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1); - pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); - pCfg->daysToKeep = htonl(pCfg->daysToKeep); - pCfg->commitTime = htonl(pCfg->commitTime); - pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - pCfg->blocksPerTable = htons(pCfg->blocksPerTable); - pCfg->replications = (int8_t) pVgroup->numOfVnodes; - - SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; + SMDVnodeCfg *pCfg = &pVnode->cfg; + pCfg->vgId = htonl(pVgroup->vgId); + pCfg->maxTables = htonl(pDb->cfg.maxSessions); + pCfg->maxCacheSize = htobe64((int64_t)pDb->cfg.cacheBlockSize * pDb->cfg.cacheNumOfBlocks.totalBlocks); + pCfg->maxCacheSize = htobe64(-1); + pCfg->minRowsPerFileBlock = htonl(-1); + pCfg->maxRowsPerFileBlock = htonl(-1); + pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile); + pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1); + pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2); + pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep); + pCfg->daysToKeep = htonl(-1); + pCfg->commitTime = htonl(pDb->cfg.commitTime); + pCfg->precision = pDb->cfg.precision; + pCfg->compression = pDb->cfg.compression; + pCfg->compression = -1; + pCfg->wals = 3; + pCfg->commitLog = pDb->cfg.commitLog; + pCfg->replications = (int8_t) pVgroup->numOfVnodes; + pCfg->quorum = 1; + pCfg->arbitratorIp = htonl(pVgroup->vnodeGid[0].privateIp); + + SMDVnodeDesc *pNodes = pVnode->nodes; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { - vpeerDesc[j].vgId = htonl(pVgroup->vgId); - vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId); - vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); + SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[j].dnodeId); + if (pDnode != NULL) { + pNodes[j].nodeId = htonl(pDnode->dnodeId); + pNodes[j].nodeIp = htonl(pDnode->privateIp); + strcpy(pNodes[j].nodeName, pDnode->dnodeName); + mgmtReleaseDnode(pDnode); + } } return pVnode; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 648188cdae62ae99798b532a94bdd6a1b4657fb2..8026b00b5b8099d090d48f7bdf957dcb7e8b724c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -11,7 +11,7 @@ #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) #define TSDB_MIN_ID 0 #define TSDB_MAX_ID INT_MAX -#define TSDB_MIN_TABLES 10 +#define TSDB_MIN_TABLES 4 #define TSDB_MAX_TABLES 100000 #define TSDB_DEFAULT_TABLES 1000 #define TSDB_DEFAULT_DAYS_PER_FILE 10 diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index 51065b8645fbc32403850d31cb4d92332b047892..6ceb83cb45f2e06cf46fb999a57025dc5453dc03 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -9,6 +9,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 9370e93683975e5d6636978198889aeface83a8c..029d4c8c84b350dc695cd5d7abf1d46fc0ef3182 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -18,10 +18,12 @@ #include "ihash.h" #include "taoserror.h" #include "taosmsg.h" +#include "tutil.h" #include "trpc.h" #include "tsdb.h" #include "ttime.h" #include "ttimer.h" +#include "cJSON.h" #include "twal.h" #include "tglobal.h" #include "dnode.h" @@ -93,21 +95,21 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { STsdbCfg tsdbCfg = {0}; tsdbCfg.precision = pVnodeCfg->cfg.precision; - tsdbCfg.compression = -1; + tsdbCfg.compression = pVnodeCfg->cfg.compression;; tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; - tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; + tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables; tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; - tsdbCfg.minRowsPerFileBlock = -1; - tsdbCfg.maxRowsPerFileBlock = -1; - tsdbCfg.keep = -1; - tsdbCfg.maxCacheSize = -1; + tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; + tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; + tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; + tsdbCfg.maxCacheSize = pVnodeCfg->cfg.maxCacheSize; char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); if (code != TSDB_CODE_SUCCESS) { - dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); - return terrno; + dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code)); + return TSDB_CODE_VG_INIT_FAILED; } dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog); @@ -328,88 +330,235 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { - char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; - sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId); - + char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId); FILE *fp = fopen(cfgFile, "w"); - if (!fp) return errno; + if (!fp) { + dError("vgId:%d, failed to open vnode cfg file for write, error:%s", pVnodeCfg->cfg.vgId, strerror(errno)); + return errno; + } + + char ipStr[20]; + int32_t len = 0; + int32_t maxLen = 1000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + + len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); + len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); + len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables); + len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile); + len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); + len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); + len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep); + + len += snprintf(content + len, maxLen - len, " \"maxCacheSize\": %" PRId64 ",\n", pVnodeCfg->cfg.maxCacheSize); + + len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog); + len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); + + uint32_t ipInt = pVnodeCfg->cfg.arbitratorIp; + sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24)); + len += snprintf(content + len, maxLen - len, " \"arbitratorIp\": \"%s\",\n", ipStr); - fprintf(fp, "commitLog %d\n", pVnodeCfg->cfg.commitLog); - fprintf(fp, "wals %d\n", 3); - fprintf(fp, "arbitratorIp %d\n", pVnodeCfg->vpeerDesc[0].ip); - fprintf(fp, "quorum %d\n", 1); - fprintf(fp, "replica %d\n", pVnodeCfg->cfg.replications); + len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); + len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); + + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { - fprintf(fp, "index%d nodeId %d nodeIp %u name n%d\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip, pVnodeCfg->vpeerDesc[i].dnodeId); + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId); + + uint32_t ipInt = pVnodeCfg->nodes[i].nodeIp; + sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24)); + len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", ipStr); + + len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", pVnodeCfg->nodes[i].nodeName); + + if (i < pVnodeCfg->cfg.replications - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } } + len += snprintf(content + len, maxLen - len, "}\n"); + fwrite(content, 1, len, fp); fclose(fp); - dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId); + free(content); - return TSDB_CODE_SUCCESS; + dPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId); + + return 0; } -// TODO: this is a simple implement static int32_t vnodeReadCfg(SVnodeObj *pVnode) { - char option[5][16] = {0}; - char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; - sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId); - + char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(cfgFile, "r"); - if (!fp) return errno; - - int32_t commitLog = -1; - int32_t num = fscanf(fp, "%s %d", option[0], &commitLog); - if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[0], "commitLog") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (commitLog == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->walCfg.commitLog = (int8_t)commitLog; - - int32_t wals = -1; - num = fscanf(fp, "%s %d", option[0], &wals); - if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[0], "wals") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (wals == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->walCfg.wals = (int8_t)wals; + if (!fp) { + dError("pVnode:%p vgId:%d, failed to open vnode cfg file for read, error:%s", pVnode, pVnode->vgId, strerror(errno)); + return errno; + } + + int ret = TSDB_CODE_OTHERS; + int maxLen = 1000; + char *content = calloc(1, maxLen + 1); + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + free(content); + fclose(fp); + dError("pVnode:%p vgId:%d, failed to vnode cfg, content is null", pVnode, pVnode->vgId); + return false; + } + + cJSON *root = cJSON_Parse(content); + if (root == NULL) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, invalid json format", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + + cJSON *precision = cJSON_GetObjectItem(root, "precision"); + if (!precision || precision->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, precision not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.precision = (int8_t)precision->valueint; + + cJSON *compression = cJSON_GetObjectItem(root, "compression"); + if (!compression || compression->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, compression not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.compression = (int8_t)compression->valueint; + + cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); + if (!maxTables || maxTables->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, maxTables not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.maxTables = maxTables->valueint; + + cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); + if (!daysPerFile || daysPerFile->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, daysPerFile not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint; + + cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); + if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, minRowsPerFileBlock not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint; + + cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); + if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, maxRowsPerFileBlock not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; + + cJSON *keep = cJSON_GetObjectItem(root, "keep"); + if (!keep || keep->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, keep not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.keep = keep->valueint; + + cJSON *maxCacheSize = cJSON_GetObjectItem(root, "maxCacheSize"); + if (!maxCacheSize || maxCacheSize->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, maxCacheSize not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->tsdbCfg.maxCacheSize = maxCacheSize->valueint; + + cJSON *commitLog = cJSON_GetObjectItem(root, "commitLog"); + if (!commitLog || commitLog->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, commitLog not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->walCfg.commitLog = (int8_t)commitLog->valueint; + + cJSON *wals = cJSON_GetObjectItem(root, "wals"); + if (!wals || wals->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, wals not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->walCfg.wals = (int8_t)wals->valueint; pVnode->walCfg.keep = 0; - int32_t arbitratorIp = -1; - num = fscanf(fp, "%s %u", option[0], &arbitratorIp); - if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->syncCfg.arbitratorIp = arbitratorIp; - - int32_t quorum = -1; - num = fscanf(fp, "%s %d", option[0], &quorum); - if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[0], "quorum") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (quorum == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->syncCfg.quorum = (int8_t)quorum; - - int32_t replica = -1; - num = fscanf(fp, "%s %d", option[0], &replica); - if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[0], "replica") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (replica == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->syncCfg.replica = (int8_t)replica; - - for (int32_t i = 0; i < replica; ++i) { - int32_t dnodeId = -1; - uint32_t dnodeIp = -1; - num = fscanf(fp, "%s %s %d %s %u %s %s", option[0], option[1], &dnodeId, option[2], &dnodeIp, option[3], pVnode->syncCfg.nodeInfo[i].name); - if (num != 7) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[1], "nodeId") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[2], "nodeIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[3], "name") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (dnodeId == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - if (dnodeIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->syncCfg.nodeInfo[i].nodeId = dnodeId; - pVnode->syncCfg.nodeInfo[i].nodeIp = dnodeIp; + cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp"); + if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId); + goto PARSE_OVER; } + pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring); - fclose(fp); - dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId); + cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); + if (!quorum || quorum->type != cJSON_Number) { + dError("failed to vnode cfg, quorum not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->syncCfg.quorum = (int8_t)quorum->valueint; - return TSDB_CODE_SUCCESS; + cJSON *replica = cJSON_GetObjectItem(root, "replica"); + if (!replica || replica->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, replica not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->syncCfg.replica = (int8_t)replica->valueint; + + cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeInfos not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + + int size = cJSON_GetArraySize(nodeInfos); + if (size != pVnode->syncCfg.replica) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeInfos size not matched", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + + for (int i = 0; i < size; ++i) { + cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeId not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint; + + cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp"); + if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeIp not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->syncCfg.nodeInfo[i].nodeIp = inet_addr(nodeIp->valuestring); + + cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName"); + if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) { + dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeName not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + strncpy(pVnode->syncCfg.nodeInfo[i].name, nodeName->valuestring, TSDB_NODE_NAME_LEN); + } + + ret = 0; + + dPrint("pVnode:%p vgId:%d, read vnode cfg successed, replcia:%d", pVnode, pVnode->vgId, pVnode->syncCfg.replica); + for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { + dPrint("pVnode:%p vgId:%d, dnode:%d, ip:%s name:%s", pVnode, pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId, + taosIpStr(pVnode->syncCfg.nodeInfo[i].nodeIp), pVnode->syncCfg.nodeInfo[i].name); + } + +PARSE_OVER: + free(content); + cJSON_Delete(root); + fclose(fp); + return ret; } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 6c4ada50fe86c272d7a96ec8fa7f698d9ba3e200..b1aa7c6382c9305ee34c92ebc901beb1e974ee9b 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -93,7 +93,7 @@ echo "privateIp $NODE_IP" >> $TAOS_CFG echo "dDebugFlag 199" >> $TAOS_CFG echo "mDebugFlag 199" >> $TAOS_CFG echo "sdbDebugFlag 199" >> $TAOS_CFG -echo "rpcDebugFlag 135" >> $TAOS_CFG +echo "rpcDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 131" >> $TAOS_CFG @@ -105,7 +105,7 @@ echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG echo "defaultPass taosdata" >> $TAOS_CFG echo "numOfLogLines 100000000" >> $TAOS_CFG echo "mgmtEqualVnodeNum 0" >> $TAOS_CFG -echo "clog 0" >> $TAOS_CFG +echo "clog 2" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG echo "numOfTotalVnodes 4" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG diff --git a/tests/script/test.sh b/tests/script/test.sh index bce6291fbe060a12c032b39ae7befee3e9c9ec22..b9660458b02ead666afd94ef82ed452a90eb021f 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -34,6 +34,8 @@ cd . sh/ip.sh -i 1 -s up > /dev/null 2>&1 & sh/ip.sh -i 2 -s up > /dev/null 2>&1 & sh/ip.sh -i 3 -s up > /dev/null 2>&1 & +sh/ip.sh -i 4 -s up > /dev/null 2>&1 & +sh/ip.sh -i 5 -s up > /dev/null 2>&1 & # Get responsible directories CODE_DIR=`dirname $0`