From 2861c026404d3a88f1c349cb32d8e025ec052cd2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 26 Oct 2020 12:45:49 +0000 Subject: [PATCH] TD-1762 --- src/balance/src/balance.c | 2 +- src/client/src/tscServer.c | 10 +- src/dnode/inc/dnodeCfg.h | 33 +++ src/dnode/inc/dnodeEps.h | 35 ++++ src/dnode/inc/dnodeMInfos.h | 36 ++++ src/dnode/inc/dnodeMgmt.h | 6 +- src/dnode/src/dnodeCfg.c | 157 ++++++++++++++ src/dnode/src/dnodeCheck.c | 6 +- src/dnode/src/dnodeEps.c | 274 ++++++++++++++++++++++++ src/dnode/src/dnodeMInfos.c | 277 +++++++++++++++++++++++++ src/dnode/src/dnodeMain.c | 23 ++- src/dnode/src/dnodeMgmt.c | 380 +++------------------------------- src/dnode/src/dnodeModule.c | 4 +- src/dnode/src/dnodePeer.c | 6 +- src/inc/dnode.h | 12 +- src/inc/taosmsg.h | 49 +++-- src/mnode/src/mnodeDnode.c | 66 +++++- src/mnode/src/mnodeMnode.c | 29 +-- src/mnode/src/mnodeSdb.c | 10 +- src/mnode/src/mnodeShow.c | 8 +- src/vnode/inc/vnodeCfg.h | 30 +++ src/vnode/inc/vnodeVersion.h | 30 +++ src/vnode/src/vnodeCfg.c | 322 +++++++++++++++++++++++++++++ src/vnode/src/vnodeMain.c | 391 ++--------------------------------- src/vnode/src/vnodeVersion.c | 102 +++++++++ 25 files changed, 1495 insertions(+), 803 deletions(-) create mode 100644 src/dnode/inc/dnodeCfg.h create mode 100644 src/dnode/inc/dnodeEps.h create mode 100644 src/dnode/inc/dnodeMInfos.h create mode 100644 src/dnode/src/dnodeCfg.c create mode 100644 src/dnode/src/dnodeEps.c create mode 100644 src/dnode/src/dnodeMInfos.c create mode 100644 src/vnode/inc/vnodeCfg.h create mode 100644 src/vnode/inc/vnodeVersion.h create mode 100644 src/vnode/src/vnodeCfg.c create mode 100644 src/vnode/src/vnodeVersion.c diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 0e9bb85b25..4c687cb134 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -490,7 +490,7 @@ static bool balanceMontiorDropping() { if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { if (pDnode->lastAccess + tsOfflineThreshold > tsAccessSquence) continue; - if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) continue; + if (dnodeIsMasterEp(pDnode->dnodeEp)) continue; if (mnodeGetDnodesNum() <= 1) continue; mLInfo("dnode:%d, set to removing state for it offline:%d seconds", pDnode->dnodeId, diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a0841fa234..473c1101e5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -922,13 +922,13 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; - pCmd->payloadLen = sizeof(SCMCreateAcctMsg); + pCmd->payloadLen = sizeof(SCreateAcctMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload; + SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload; SStrToken *pName = &pInfo->pDCLInfo->user.user; SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd; @@ -1461,14 +1461,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { STscObj *pObj = pSql->pTscObj; SSqlCmd *pCmd = &pSql->cmd; pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT; - pCmd->payloadLen = sizeof(SCMConnectMsg); + pCmd->payloadLen = sizeof(SConnectMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { tscError("%p failed to malloc for query msg", pSql); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; + SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload; // TODO refactor full_name char *db; // ugly code to move the space @@ -1987,7 +1987,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; - SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp; + SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp; tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); diff --git a/src/dnode/inc/dnodeCfg.h b/src/dnode/inc/dnodeCfg.h new file mode 100644 index 0000000000..35d8896460 --- /dev/null +++ b/src/dnode/inc/dnodeCfg.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_CFG_H +#define TDENGINE_DNODE_CFG_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitCfg(); +void dnodeCleanupCfg(); +void dnodeUpdateCfg(SDnodeCfg *cfg); +int32_t dnodeGetDnodeId(); +void dnodeGetCfg(int32_t *dnodeId, char *clusterId); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeEps.h b/src/dnode/inc/dnodeEps.h new file mode 100644 index 0000000000..2a203498c1 --- /dev/null +++ b/src/dnode/inc/dnodeEps.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_EP_H +#define TDENGINE_DNODE_EP_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "taosmsg.h" + +int32_t dnodeInitEps(); +void dnodeCleanupEps(); +void dnodeUpdateEps(SDnodeEps *eps); +void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); +bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeMInfos.h b/src/dnode/inc/dnodeMInfos.h new file mode 100644 index 0000000000..9c3c85c47e --- /dev/null +++ b/src/dnode/inc/dnodeMInfos.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MINFOS_H +#define TDENGINE_DNODE_MINFOS_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "taosmsg.h" + +int32_t dnodeInitMInfos(); +void dnodeCleanupMInfos(); +void dnodeUpdateMInfos(SMnodeInfos *minfos); +void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet); +void dnodeGetMInfos(SMnodeInfos *minfos); +bool dnodeIsMasterEp(char *ep); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index e8f4a0823f..2038ef5286 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "trpc.h" + int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); int32_t dnodeInitMgmtTimer(); @@ -35,8 +37,8 @@ void* dnodeGetVnodeTsdb(void *pVnode); void dnodeReleaseVnode(void *pVnode); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); -void dnodeGetMnodeEpSetForPeer(void *epSet); -void dnodeGetMnodeEpSetForShell(void *epSet); +void dnodeGetEpSetForPeer(SRpcEpSet *epSet); +void dnodeGetEpSetForShell(SRpcEpSet *epSet); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeCfg.c b/src/dnode/src/dnodeCfg.c new file mode 100644 index 0000000000..19ebde163d --- /dev/null +++ b/src/dnode/src/dnodeCfg.c @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "cJSON.h" +#include "tglobal.h" +#include "dnode.h" +#include "dnodeInt.h" +#include "dnodeCfg.h" + +static SDnodeCfg tsCfg = {0}; +static pthread_mutex_t tsCfgMutex; + +static int32_t dnodeReadCfg(); +static int32_t dnodeWriteCfg(); +static void dnodeResetCfg(SDnodeCfg *cfg); +static void dnodePrintCfg(SDnodeCfg *cfg); + +int32_t dnodeInitCfg() { + pthread_mutex_init(&tsCfgMutex, NULL); + dnodeResetCfg(NULL); + return dnodeReadCfg(); +} + +void dnodeCleanupCfg() { pthread_mutex_destroy(&tsCfgMutex); } + +void dnodeUpdateCfg(SDnodeCfg *cfg) { + if (tsCfg.dnodeId != 0) return; + dnodeResetCfg(cfg); +} + +int32_t dnodeGetDnodeId() { + int32_t dnodeId = 0; + pthread_mutex_lock(&tsCfgMutex); + dnodeId = tsCfg.dnodeId; + pthread_mutex_unlock(&tsCfgMutex); + return dnodeId; +} + +void dnodeGetCfg(int32_t *dnodeId, char *clusterId) { + pthread_mutex_lock(&tsCfgMutex); + *dnodeId = tsCfg.dnodeId; + tstrncpy(clusterId, tsCfg.clusterId, TSDB_CLUSTER_ID_LEN); + pthread_mutex_unlock(&tsCfgMutex); +} + +static void dnodeResetCfg(SDnodeCfg *cfg) { + if (cfg == NULL) return; + if (cfg->dnodeId == 0) return; + + pthread_mutex_lock(&tsCfgMutex); + tsCfg.dnodeId = cfg->dnodeId; + tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); + dnodePrintCfg(cfg); + dnodeWriteCfg(); + pthread_mutex_unlock(&tsCfgMutex); +} + +static void dnodePrintCfg(SDnodeCfg *cfg) { + dInfo("dnodeId is set to %d, clusterId is set to %s", cfg->dnodeId, cfg->clusterId); +} + +static int32_t dnodeReadCfg() { + int32_t len = 0; + int32_t maxLen = 200; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; + SDnodeCfg cfg = {0}; + + char file[TSDB_FILENAME_LEN + 20] = {0}; + sprintf(file, "%s/dnodeCfg.json", tsDnodeDir); + + fp = fopen(file, "r"); + if (!fp) { + dDebug("failed to read %s, file not exist", file); + goto PARSE_CFG_OVER; + } + + len = fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s, content is null", file); + goto PARSE_CFG_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s, invalid json format", file); + goto PARSE_CFG_OVER; + } + + cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_Number) { + dError("failed to read %s, dnodeId not found", file); + goto PARSE_CFG_OVER; + } + cfg.dnodeId = dnodeId->valueint; + + cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); + if (!clusterId || clusterId->type != cJSON_String) { + dError("failed to read %s, clusterId not found", file); + goto PARSE_CFG_OVER; + } + tstrncpy(cfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN); + + dInfo("read file %s successed", file); + +PARSE_CFG_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + dnodeResetCfg(&cfg); + return 0; +} + +static int32_t dnodeWriteCfg() { + char file[TSDB_FILENAME_LEN + 20] = {0}; + sprintf(file, "%s/dnodeCfg.json", tsDnodeDir); + + FILE *fp = fopen(file, "w"); + if (!fp) { + dError("failed to write %s, reason:%s", file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 200; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fflush(fp); + fclose(fp); + free(content); + + dInfo("successed to write %s", file); + return 0; +} diff --git a/src/dnode/src/dnodeCheck.c b/src/dnode/src/dnodeCheck.c index 9b68fc1f6c..b68c22df33 100644 --- a/src/dnode/src/dnodeCheck.c +++ b/src/dnode/src/dnodeCheck.c @@ -15,9 +15,7 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosdef.h" #include "tglobal.h" -#include "mnode.h" #include "dnodeInt.h" #include "dnodeCheck.h" @@ -30,8 +28,8 @@ typedef struct { void (*stopFp)(); } SCheckItem; -static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}}; -int64_t tsMinFreeMemSizeForStart = 0; +static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}}; +int64_t tsMinFreeMemSizeForStart = 0; static int bindTcpPort(int port) { int serverSocket; diff --git a/src/dnode/src/dnodeEps.c b/src/dnode/src/dnodeEps.c new file mode 100644 index 0000000000..a97bf02cca --- /dev/null +++ b/src/dnode/src/dnodeEps.c @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "cJSON.h" +#include "tglobal.h" +#include "hash.h" +#include "dnode.h" +#include "dnodeInt.h" +#include "dnodeEps.h" + +static SDnodeEps *tsEps = NULL; +static SHashObj * tsEpsHash = NULL; +static pthread_mutex_t tsEpsMutex; + +static int32_t dnodeReadEps(); +static int32_t dnodeWriteEps(); +static void dnodeResetEps(SDnodeEps *eps); +static void dnodePrintEps(SDnodeEps *eps); + +int32_t dnodeInitEps() { + pthread_mutex_init(&tsEpsMutex, NULL); + tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); + dnodeResetEps(NULL); + return dnodeReadEps(); +} + +void dnodeCleanupEps() { + pthread_mutex_lock(&tsEpsMutex); + if (tsEps) { + free(tsEps); + tsEps = NULL; + } + if (tsEpsHash) { + taosHashCleanup(tsEpsHash); + tsEpsHash = NULL; + } + pthread_mutex_unlock(&tsEpsMutex); + pthread_mutex_destroy(&tsEpsMutex); +} + +void dnodeUpdateEps(SDnodeEps *eps) { + if (eps == NULL) return; + + eps->dnodeNum = htonl(eps->dnodeNum); + for (int32_t i = 0; i < eps->dnodeNum; ++i) { + eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId); + eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort); + } + + pthread_mutex_lock(&tsEpsMutex); + if (eps->dnodeNum != tsEps->dnodeNum) { + dnodeResetEps(eps); + dnodeWriteEps(); + } else { + int32_t size = sizeof(SDnodeEps) + eps->dnodeNum * sizeof(SDnodeEp); + if (memcmp(eps, tsEps, size) != 0) { + dnodeResetEps(eps); + dnodeWriteEps(); + } + } + pthread_mutex_unlock(&tsEpsMutex); +} + +bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr) { + bool changed = false; + pthread_mutex_lock(&tsEpsMutex); + SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t)); + if (ep != NULL) { + char epSaved[TSDB_EP_LEN + 1]; + snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); + changed = strcmp(epstr, epSaved) != 0; + } + pthread_mutex_unlock(&tsEpsMutex); + return changed; +} + +void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { + pthread_mutex_lock(&tsEpsMutex); + SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t)); + if (ep != NULL) { + if (port) *port = ep->dnodePort; + if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); + if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); + } + pthread_mutex_unlock(&tsEpsMutex); +} + +static void dnodeResetEps(SDnodeEps *eps) { + if (eps == NULL) { + int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp); + if (tsEps == NULL) { + tsEps = calloc(1, size); + } else { + tsEps->dnodeNum = 0; + } + } else { + assert(tsEps); + + int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp) * eps->dnodeNum; + if (eps->dnodeNum > tsEps->dnodeNum) { + tsEps = realloc(tsEps, size); + } + memcpy(tsEps, eps, size); + dnodePrintEps(eps); + } + + for (int32_t i = 0; i < tsEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsEps->dnodeEps[i]; + taosHashPut(tsEpsHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); + } +} + +static void dnodePrintEps(SDnodeEps *eps) { + dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum); + for (int32_t i = 0; i < eps->dnodeNum; i++) { + SDnodeEp *ep = &eps->dnodeEps[i]; + dDebug("dnodeId:%d, dnodeFqdn:%s dnodePort:%u", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort); + } +} + +static int32_t dnodeReadEps() { + int32_t ret = -1; + int32_t len = 0; + int32_t maxLen = 30000; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; + SDnodeEps *eps = NULL; + + char file[TSDB_FILENAME_LEN + 20] = {0}; + sprintf(file, "%s/dnodeEps.json", tsDnodeDir); + + fp = fopen(file, "r"); + if (!fp) { + dDebug("failed to read %s, file not exist", file); + goto PRASE_EPS_OVER; + } + + len = fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s, content is null", file); + goto PRASE_EPS_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s, invalid json format", file); + goto PRASE_EPS_OVER; + } + + cJSON *dnodeNum = cJSON_GetObjectItem(root, "dnodeNum"); + if (!dnodeNum || dnodeNum->type != cJSON_Number) { + dError("failed to read %s, dnodeNum not found", file); + goto PRASE_EPS_OVER; + } + + cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); + if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { + dError("failed to read %s, dnodeInfos not found", file); + goto PRASE_EPS_OVER; + } + + int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); + if (dnodeInfosSize != dnodeNum->valueint) { + dError("failed to read %s, dnodeInfos size:%d not matched dnodeNum:%d", file, dnodeInfosSize, + (int32_t)dnodeNum->valueint); + goto PRASE_EPS_OVER; + } + + int32_t epsSize = sizeof(SDnodeEps) + dnodeInfosSize * sizeof(SDnodeEp); + eps = calloc(1, epsSize); + eps->dnodeNum = dnodeInfosSize; + + for (int32_t i = 0; i < dnodeInfosSize; ++i) { + cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); + if (dnodeInfo == NULL) break; + + SDnodeEp *ep = &eps->dnodeEps[i]; + + cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_Number) { + dError("failed to read %s, dnodeId not found", file); + goto PRASE_EPS_OVER; + } + ep->dnodeId = dnodeId->valueint; + + cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); + if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { + dError("failed to read %s, dnodeFqdn not found", file); + goto PRASE_EPS_OVER; + } + strncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); + + cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); + if (!dnodePort || dnodePort->type != cJSON_Number) { + dError("failed to read %s, dnodePort not found", file); + goto PRASE_EPS_OVER; + } + ep->dnodePort = (uint16_t)dnodePort->valueint; + } + + ret = 0; + + dInfo("read file %s successed", file); + dnodePrintEps(eps); + +PRASE_EPS_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + if (ret != 0) { + if (eps) free(eps); + eps = NULL; + } + + dnodeResetEps(eps); + if (eps) free(eps); + + return 0; +} + +static int32_t dnodeWriteEps() { + char file[TSDB_FILENAME_LEN + 20] = {0}; + sprintf(file, "%s/dnodeEps.json", tsDnodeDir); + + FILE *fp = fopen(file, "w"); + if (!fp) { + dError("failed to write %s, reason:%s", file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 30000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", tsEps->dnodeNum); + len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); + for (int32_t i = 0; i < tsEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsEps->dnodeEps[i]; + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", ep->dnodeId); + len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); + len += snprintf(content + len, maxLen - len, " \"dnodePort\": %u\n", ep->dnodePort); + if (i < tsEps->dnodeNum - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fflush(fp); + fclose(fp); + free(content); + + dInfo("successed to write %s", file); + return 0; +} diff --git a/src/dnode/src/dnodeMInfos.c b/src/dnode/src/dnodeMInfos.c new file mode 100644 index 0000000000..31e22514a3 --- /dev/null +++ b/src/dnode/src/dnodeMInfos.c @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "cJSON.h" +#include "tglobal.h" +#include "mnode.h" +#include "dnode.h" +#include "dnodeInt.h" +#include "dnodeMInfos.h" + +static SMnodeInfos tsMInfos; +static SRpcEpSet tsMEpSet; +static pthread_mutex_t tsMInfosMutex; + +static void dnodeResetMInfos(SMnodeInfos *minfos); +static void dnodePrintMInfos(SMnodeInfos *minfos); +static int32_t dnodeReadMInfos(); +static int32_t dnodeWriteMInfos(); + +int32_t dnodeInitMInfos() { + pthread_mutex_init(&tsMInfosMutex, NULL); + dnodeResetMInfos(NULL); + return dnodeReadMInfos(); +} + +void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); } + +void dnodeUpdateMInfos(SMnodeInfos *minfos) { + if (minfos->mnodeNum <= 0 || minfos->mnodeNum > 3) { + dError("invalid mnode infos, mnodeNum:%d", minfos->mnodeNum); + return; + } + + for (int32_t i = 0; i < minfos->mnodeNum; ++i) { + SMnodeInfo *minfo = &minfos->mnodeInfos[i]; + minfo->mnodeId = htonl(minfo->mnodeId); + if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) { + dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp); + return; + } + } + + pthread_mutex_lock(&tsMInfosMutex); + if (minfos->mnodeNum != tsMInfos.mnodeNum) { + dnodeResetMInfos(minfos); + dnodeWriteMInfos(); + sdbUpdateAsync(); + } else { + int32_t size = sizeof(SMnodeInfos); + if (memcmp(minfos, &tsMInfos, size) != 0) { + dnodeResetMInfos(minfos); + dnodeWriteMInfos(); + sdbUpdateAsync(); + } + } + pthread_mutex_unlock(&tsMInfosMutex); +} + +void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) { + if (ep->numOfEps <= 0) { + dError("mnode EP list for peer is changed, but content is invalid, discard it"); + return; + } + + pthread_mutex_lock(&tsMInfosMutex); + dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); + for (int i = 0; i < ep->numOfEps; ++i) { + ep->port[i] -= TSDB_PORT_DNODEDNODE; + dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); + } + tsMEpSet = *ep; + pthread_mutex_unlock(&tsMInfosMutex); +} + +bool dnodeIsMasterEp(char *ep) { + pthread_mutex_lock(&tsMInfosMutex); + bool isMaster = strcmp(ep, tsMInfos.mnodeInfos[tsMEpSet.inUse].mnodeEp) == 0; + pthread_mutex_unlock(&tsMInfosMutex); + + return isMaster; +} + +void dnodeGetMInfos(SMnodeInfos *minfos) { + pthread_mutex_lock(&tsMInfosMutex); + memcpy(minfos, &tsMInfos, sizeof(SMnodeInfos)); + for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) { + minfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId); + } + pthread_mutex_unlock(&tsMInfosMutex); +} + +void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsMInfosMutex); + *epSet = tsMEpSet; + for (int i = 0; i < epSet->numOfEps; ++i) { + epSet->port[i] += TSDB_PORT_DNODEDNODE; + } + pthread_mutex_unlock(&tsMInfosMutex); +} + +void dnodeGetEpSetForShell(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsMInfosMutex); + *epSet = tsMEpSet; + pthread_mutex_unlock(&tsMInfosMutex); +} + +static void dnodePrintMInfos(SMnodeInfos *minfos) { + dInfo("print mnode infos, mnodeNum:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse); + for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) { + dInfo("mnode index:%d, %s", tsMInfos.mnodeInfos[i].mnodeId, tsMInfos.mnodeInfos[i].mnodeEp); + } +} + +static void dnodeResetMInfos(SMnodeInfos *minfos) { + if (minfos == NULL) { + tsMEpSet.numOfEps = 1; + taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]); + + if (strcmp(tsSecond, tsFirst) != 0) { + tsMEpSet.numOfEps = 2; + taosGetFqdnPortFromEp(tsSecond, tsMEpSet.fqdn[1], &tsMEpSet.port[1]); + } + return; + } + + if (minfos->mnodeNum == 0) return; + + int32_t size = sizeof(SMnodeInfos); + memcpy(&tsMInfos, minfos, size); + + tsMEpSet.inUse = tsMInfos.inUse; + tsMEpSet.numOfEps = tsMInfos.mnodeNum; + for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) { + taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]); + } + + dnodePrintMInfos(minfos); +} + +static int32_t dnodeReadMInfos() { + int32_t len = 0; + int32_t maxLen = 2000; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; + SMnodeInfos minfos = {0}; + + char file[TSDB_FILENAME_LEN + 20] = {0}; + sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir); + + fp = fopen(file, "r"); + if (!fp) { + dDebug("failed to read %s, file not exist", file); + goto PARSE_MINFOS_OVER; + } + + len = fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s, content is null", file); + goto PARSE_MINFOS_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s, invalid json format", file); + goto PARSE_MINFOS_OVER; + } + + cJSON *inUse = cJSON_GetObjectItem(root, "inUse"); + if (!inUse || inUse->type != cJSON_Number) { + dError("failed to read mnodeEpSet.json, inUse not found"); + goto PARSE_MINFOS_OVER; + } + tsMInfos.inUse = inUse->valueint; + + cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum"); + if (!nodeNum || nodeNum->type != cJSON_Number) { + dError("failed to read mnodeEpSet.json, nodeNum not found"); + goto PARSE_MINFOS_OVER; + } + minfos.mnodeNum = nodeNum->valueint; + + cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + dError("failed to read mnodeEpSet.json, nodeInfos not found"); + goto PARSE_MINFOS_OVER; + } + + int size = cJSON_GetArraySize(nodeInfos); + if (size != minfos.mnodeNum) { + dError("failed to read mnodeEpSet.json, nodeInfos size not matched"); + goto PARSE_MINFOS_OVER; + } + + for (int i = 0; i < size; ++i) { + cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + dError("failed to read mnodeEpSet.json, nodeId not found"); + goto PARSE_MINFOS_OVER; + } + minfos.mnodeInfos[i].mnodeId = nodeId->valueint; + + cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); + if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { + dError("failed to read mnodeEpSet.json, nodeName not found"); + goto PARSE_MINFOS_OVER; + } + strncpy(minfos.mnodeInfos[i].mnodeEp, nodeEp->valuestring, TSDB_EP_LEN); + } + + dInfo("read file %s successed", file); + dnodePrintMInfos(&minfos); + +PARSE_MINFOS_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + dnodeResetMInfos(&minfos); + return 0; +} + +static int32_t dnodeWriteMInfos() { + char file[TSDB_FILENAME_LEN + 20] = {0}; + sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir); + + FILE *fp = fopen(file, "w"); + if (!fp) { + dError("failed to write %s, reason:%s", file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 2000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMInfos.inUse); + len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMInfos.mnodeNum); + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); + for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) { + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMInfos.mnodeInfos[i].mnodeId); + len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMInfos.mnodeInfos[i].mnodeEp); + if (i < tsMInfos.mnodeNum - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fflush(fp); + fclose(fp); + free(content); + + dInfo("successed to write %s", file); + return 0; +} diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 97e6f2ce6d..991ac8f9f9 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -24,6 +24,9 @@ #include "dnodeMgmt.h" #include "dnodePeer.h" #include "dnodeModule.h" +#include "dnodeEps.h" +#include "dnodeMInfos.h" +#include "dnodeCfg.h" #include "dnodeCheck.h" #include "dnodeVRead.h" #include "dnodeVWrite.h" @@ -33,23 +36,27 @@ #include "dnodeShell.h" #include "dnodeTelemetry.h" -static int32_t dnodeInitStorage(); -static void dnodeCleanupStorage(); -static void dnodeSetRunStatus(SDnodeRunStatus status); -static void dnodeCheckDataDirOpenned(char *dir); static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; + +static int32_t dnodeInitStorage(); +static void dnodeCleanupStorage(); +static void dnodeSetRunStatus(SDnodeRunStatus status); +static void dnodeCheckDataDirOpenned(char *dir); static int32_t dnodeInitComponents(); -static void dnodeCleanupComponents(int32_t stepId); -static int dnodeCreateDir(const char *dir); +static void dnodeCleanupComponents(int32_t stepId); +static int dnodeCreateDir(const char *dir); typedef struct { const char *const name; - int (*init)(); - void (*cleanup)(); + int32_t (*init)(); + void (*cleanup)(); } SDnodeComponent; static const SDnodeComponent tsDnodeComponents[] = { {"storage", dnodeInitStorage, dnodeCleanupStorage}, + {"eps", dnodeInitEps, dnodeCleanupEps}, + {"minfos", dnodeInitMInfos, dnodeCleanupMInfos}, + {"cfg", dnodeInitCfg, dnodeCleanupCfg}, {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 968a8d9759..c26b548fca 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -31,12 +31,13 @@ #include "mnode.h" #include "dnodeInt.h" #include "dnodeMgmt.h" +#include "dnodeEps.h" +#include "dnodeCfg.h" +#include "dnodeMInfos.h" #include "dnodeVRead.h" #include "dnodeVWrite.h" #include "dnodeModule.h" -#define MPEER_CONTENT_LEN 2000 - typedef struct { pthread_t thread; int32_t threadIndex; @@ -46,23 +47,13 @@ typedef struct { int32_t * vnodeList; } SOpenVnodeThread; -void * tsDnodeTmr = NULL; -static void * tsStatusTimer = NULL; -static uint32_t tsRebootTime; - -static SRpcEpSet tsDMnodeEpSet = {0}; -static SDMMnodeInfos tsDMnodeInfos = {0}; -static SDMDnodeCfg tsDnodeCfg = {0}; -static taos_qset tsMgmtQset = NULL; -static taos_queue tsMgmtQueue = NULL; -static pthread_t tsQthread; - -static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes); -static bool dnodeReadMnodeInfos(); -static void dnodeSaveMnodeInfos(); -static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg); -static bool dnodeReadDnodeCfg(); -static void dnodeSaveDnodeCfg(); +void * tsDnodeTmr = NULL; +static void * tsStatusTimer = NULL; +static uint32_t tsRebootTime; +static taos_qset tsMgmtQset = NULL; +static taos_queue tsMgmtQueue = NULL; +static pthread_t tsQthread; + static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void dnodeSendStatusMsg(void *handle, void *tmrId); static void *dnodeProcessMgmtQueue(void *param); @@ -74,7 +65,7 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); -static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg); +static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); int32_t dnodeInitMgmt() { @@ -86,27 +77,8 @@ int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg; dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); - dnodeReadDnodeCfg(); tsRebootTime = taosGetTimestampSec(); - if (!dnodeReadMnodeInfos()) { - memset(&tsDMnodeEpSet, 0, sizeof(SRpcEpSet)); - memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos)); - - tsDMnodeEpSet.numOfEps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDMnodeEpSet.fqdn[0], &tsDMnodeEpSet.port[0]); - - if (strcmp(tsSecond, tsFirst) != 0) { - tsDMnodeEpSet.numOfEps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDMnodeEpSet.fqdn[1], &tsDMnodeEpSet.port[1]); - } - } else { - tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse; - tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]); - } - } int32_t code = vnodeInitResources(); if (code != TSDB_CODE_SUCCESS) { @@ -470,10 +442,10 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; } - dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum); - for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) { - pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId); - dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp); + dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum); + for (int i = 0; i < pCfg->mnodes.mnodeNum; ++i) { + pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId); + dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp); } dnodeStartMnode(&pCfg->mnodes); @@ -481,34 +453,6 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } -void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { - if (pEpSet->numOfEps <= 0) { - dError("mnode EP list for peer is changed, but content is invalid, discard it"); - return; - } - - dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); - for (int i = 0; i < pEpSet->numOfEps; ++i) { - pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); - } - - tsDMnodeEpSet = *pEpSet; -} - -void dnodeGetMnodeEpSetForPeer(void *epSetRaw) { - SRpcEpSet *epSet = epSetRaw; - *epSet = tsDMnodeEpSet; - - for (int i=0; inumOfEps; ++i) - epSet->port[i] += TSDB_PORT_DNODEDNODE; -} - -void dnodeGetMnodeEpSetForShell(void *epSetRaw) { - SRpcEpSet *epSet = epSetRaw; - *epSet = tsDMnodeEpSet; -} - static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { if (pMsg->code != TSDB_CODE_SUCCESS) { dError("status rsp is received, error:%s", tstrerror(pMsg->code)); @@ -517,201 +461,23 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { } SDMStatusRsp *pStatusRsp = pMsg->pCont; - SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes; - if (pMnodes->nodeNum <= 0) { - dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum); - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - return; - } + SMnodeInfos *minfos = &pStatusRsp->mnodes; + dnodeUpdateMInfos(minfos); - SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; - pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); + SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; + pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); pCfg->moduleStatus = htonl(pCfg->moduleStatus); pCfg->dnodeId = htonl(pCfg->dnodeId); - - for (int32_t i = 0; i < pMnodes->nodeNum; ++i) { - SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i]; - pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId); - } + dnodeUpdateCfg(pCfg); vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); - // will not set mnode in status msg - // dnodeProcessModuleStatus(pCfg->moduleStatus); - dnodeUpdateDnodeCfg(pCfg); + SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SDMVgroupAccess)); + dnodeUpdateEps(pEps); - dnodeUpdateMnodeInfos(pMnodes); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); } -static bool dnodeCheckMnodeInfos(SDMMnodeInfos *pMnodes) { - if (pMnodes->nodeNum <= 0 || pMnodes->nodeNum > 3) { - dError("invalid mnode infos, num:%d", pMnodes->nodeNum); - return false; - } - - for (int32_t i = 0; i < pMnodes->nodeNum; ++i) { - SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i]; - if (pMnodeInfo->nodeId <= 0 || strlen(pMnodeInfo->nodeEp) <= 5) { - dError("invalid mnode info:%d, nodeId:%d nodeEp:%s", i, pMnodeInfo->nodeId, pMnodeInfo->nodeEp); - return false; - } - } - - return true; -} - -static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { - bool mnodesChanged = (memcmp(&tsDMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0); - bool mnodesNotInit = (tsDMnodeInfos.nodeNum == 0); - if (!(mnodesChanged || mnodesNotInit)) return; - - if (!dnodeCheckMnodeInfos(pMnodes)) return; - - memcpy(&tsDMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)); - dInfo("mnode infos is changed, nodeNum:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse); - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - dInfo("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); - } - - tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse; - tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]); - } - - dnodeSaveMnodeInfos(); - sdbUpdateAsync(); -} - -static bool dnodeReadMnodeInfos() { - char ipFile[TSDB_FILENAME_LEN*2] = {0}; - - sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir); - FILE *fp = fopen(ipFile, "r"); - if (!fp) { - dDebug("failed to read mnodeEpSet.json, file not exist"); - return false; - } - - bool ret = false; - int maxLen = 2000; - char *content = calloc(1, maxLen + 1); - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - free(content); - fclose(fp); - dError("failed to read mnodeEpSet.json, content is null"); - return false; - } - - content[len] = 0; - cJSON* root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read mnodeEpSet.json, invalid json format"); - goto PARSE_OVER; - } - - cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); - if (!inUse || inUse->type != cJSON_Number) { - dError("failed to read mnodeEpSet.json, inUse not found"); - goto PARSE_OVER; - } - tsDMnodeInfos.inUse = inUse->valueint; - - cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); - if (!nodeNum || nodeNum->type != cJSON_Number) { - dError("failed to read mnodeEpSet.json, nodeNum not found"); - goto PARSE_OVER; - } - tsDMnodeInfos.nodeNum = nodeNum->valueint; - - cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); - if (!nodeInfos || nodeInfos->type != cJSON_Array) { - dError("failed to read mnodeEpSet.json, nodeInfos not found"); - goto PARSE_OVER; - } - - int size = cJSON_GetArraySize(nodeInfos); - if (size != tsDMnodeInfos.nodeNum) { - dError("failed to read mnodeEpSet.json, nodeInfos size not matched"); - goto PARSE_OVER; - } - - for (int i = 0; i < size; ++i) { - cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); - if (nodeInfo == NULL) continue; - - cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); - if (!nodeId || nodeId->type != cJSON_Number) { - dError("failed to read mnodeEpSet.json, nodeId not found"); - goto PARSE_OVER; - } - tsDMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint; - - cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); - if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - dError("failed to read mnodeEpSet.json, nodeName not found"); - goto PARSE_OVER; - } - strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); - } - - ret = true; - - dInfo("read mnode epSet successed, numOfEps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse); - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - dInfo("mnode:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp); - } - -PARSE_OVER: - free(content); - cJSON_Delete(root); - fclose(fp); - return ret; -} - -static void dnodeSaveMnodeInfos() { - char ipFile[TSDB_FILENAME_LEN] = {0}; - sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir); - FILE *fp = fopen(ipFile, "w"); - if (!fp) return; - - int32_t len = 0; - int32_t maxLen = 2000; - char * content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDMnodeInfos.inUse); - len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDMnodeInfos.nodeNum); - len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); - for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) { - len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDMnodeInfos.nodeInfos[i].nodeId); - len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDMnodeInfos.nodeInfos[i].nodeEp); - if (i < tsDMnodeInfos.nodeNum -1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - fflush(fp); - fclose(fp); - free(content); - - dInfo("save mnode epSet successed"); -} - -char *dnodeGetMnodeMasterEp() { - return tsDMnodeInfos.nodeInfos[tsDMnodeEpSet.inUse].nodeEp; -} - -void* dnodeGetMnodeInfos() { - return &tsDMnodeInfos; -} - static void dnodeSendStatusMsg(void *handle, void *tmrId) { if (tsDnodeTmr == NULL) { dError("dnode timer is already released"); @@ -732,14 +498,13 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { return; } - //strcpy(pStatus->dnodeName, tsDnodeName); + dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId); + pStatus->dnodeId = htonl(dnodeGetDnodeId()); pStatus->version = htonl(tsVersion); - pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId); pStatus->lastReboot = htonl(tsRebootTime); pStatus->numOfCores = htons((uint16_t) tsNumOfCores); pStatus->diskAvailable = tsAvailDataDirGB; pStatus->alternativeRole = (uint8_t) tsAlternativeRole; - tstrncpy(pStatus->clusterId, tsDnodeCfg.clusterId, TSDB_CLUSTER_ID_LEN); tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); // fill cluster cfg parameters @@ -769,110 +534,19 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { }; SRpcEpSet epSet; - dnodeGetMnodeEpSetForPeer(&epSet); + dnodeGetEpSetForPeer(&epSet); dnodeSendMsgToDnode(&epSet, &rpcMsg); } -static bool dnodeReadDnodeCfg() { - char dnodeCfgFile[TSDB_FILENAME_LEN*2] = {0}; - - sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir); - - FILE *fp = fopen(dnodeCfgFile, "r"); - if (!fp) { - dDebug("failed to read dnodeCfg.json, file not exist"); - return false; - } - - bool ret = false; - int maxLen = 100; - char *content = calloc(1, maxLen + 1); - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - free(content); - fclose(fp); - dError("failed to read dnodeCfg.json, content is null"); - return false; - } - - content[len] = 0; - cJSON* root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read dnodeCfg.json, invalid json format"); - goto PARSE_CFG_OVER; - } - - cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read dnodeCfg.json, dnodeId not found"); - goto PARSE_CFG_OVER; - } - tsDnodeCfg.dnodeId = dnodeId->valueint; - - cJSON* clusterId = cJSON_GetObjectItem(root, "clusterId"); - if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read dnodeCfg.json, clusterId not found"); - goto PARSE_CFG_OVER; - } - tstrncpy(tsDnodeCfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN); - - ret = true; - - dInfo("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId); - -PARSE_CFG_OVER: - free(content); - cJSON_Delete(root); - fclose(fp); - return ret; -} - -static void dnodeSaveDnodeCfg() { - char dnodeCfgFile[TSDB_FILENAME_LEN] = {0}; - sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir); - - FILE *fp = fopen(dnodeCfgFile, "w"); - if (!fp) return; - - int32_t len = 0; - int32_t maxLen = 200; - char * content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsDnodeCfg.dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsDnodeCfg.clusterId); - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - fflush(fp); - fclose(fp); - free(content); - - dInfo("save dnodeId successed"); -} - -void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) { - if (tsDnodeCfg.dnodeId == 0) { - dInfo("dnodeId is set to %d, clusterId is set to %s", pCfg->dnodeId, pCfg->clusterId); - tsDnodeCfg.dnodeId = pCfg->dnodeId; - tstrncpy(tsDnodeCfg.clusterId, pCfg->clusterId, TSDB_CLUSTER_ID_LEN); - dnodeSaveDnodeCfg(); - } -} - -int32_t dnodeGetDnodeId() { - return tsDnodeCfg.dnodeId; -} - void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { SRpcConnInfo connInfo = {0}; rpcGetConnInfo(rpcMsg->handle, &connInfo); SRpcEpSet epSet = {0}; if (forShell) { - dnodeGetMnodeEpSetForShell(&epSet); + dnodeGetEpSetForShell(&epSet); } else { - dnodeGetMnodeEpSetForPeer(&epSet); + dnodeGetEpSetForPeer(&epSet); } dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType], diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 46376159c6..4cc59ec40d 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -146,8 +146,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { } } -bool dnodeStartMnode(void *pMnodes) { - SDMMnodeInfos *mnodes = pMnodes; +bool dnodeStartMnode(SMnodeInfos *minfos) { + SMnodeInfos *mnodes = minfos; if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) { dDebug("mnode module is already started, module status:%d", tsModuleStatus); diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 3bc2f7b48b..48efdf69c1 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -28,8 +28,8 @@ #include "dnodeMgmt.h" #include "dnodeVWrite.h" #include "dnodeMPeer.h" +#include "dnodeMInfos.h" -extern void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); @@ -151,7 +151,7 @@ void dnodeCleanupClient() { static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeEpSetForPeer(pEpSet); + dnodeUpdateEpSetForPeer(pEpSet); } if (dnodeProcessRspMsgFp[pMsg->msgType]) { @@ -173,7 +173,7 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; - dnodeGetMnodeEpSetForPeer(&epSet); + dnodeGetEpSetForPeer(&epSet); rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp); } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index e84545be17..f7ebed31cf 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "trpc.h" +#include "taosmsg.h" typedef struct { int32_t queryReqNum; @@ -38,12 +39,13 @@ SDnodeRunStatus dnodeGetRunStatus(); SDnodeStatisInfo dnodeGetStatisInfo(); bool dnodeIsFirstDeploy(); -char * dnodeGetMnodeMasterEp(); -void dnodeGetMnodeEpSetForPeer(void *epSet); -void dnodeGetMnodeEpSetForShell(void *epSet); -void * dnodeGetMnodeInfos(); +bool dnodeIsMasterEp(char *ep); +void dnodeGetEpSetForPeer(SRpcEpSet *epSet); +void dnodeGetEpSetForShell(SRpcEpSet *epSet); int32_t dnodeGetDnodeId(); -bool dnodeStartMnode(void *pModes); +void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); +bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr); +bool dnodeStartMnode(SMnodeInfos *minfos); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 600347c44f..ae59d0ffc4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -308,12 +308,12 @@ typedef struct { } SUpdateTableTagValMsg; typedef struct { - char clientVersion[TSDB_VERSION_LEN]; - char msgVersion[TSDB_VERSION_LEN]; - char db[TSDB_TABLE_FNAME_LEN]; - char appName[TSDB_APPNAME_LEN]; + char clientVersion[TSDB_VERSION_LEN]; + char msgVersion[TSDB_VERSION_LEN]; + char db[TSDB_TABLE_FNAME_LEN]; + char appName[TSDB_APPNAME_LEN]; int32_t pid; -} SCMConnectMsg; +} SConnectMsg; typedef struct { char acctId[TSDB_ACCT_LEN]; @@ -324,7 +324,7 @@ typedef struct { int8_t reserved2; int32_t connId; SRpcEpSet epSet; -} SCMConnectRsp; +} SConnectRsp; typedef struct { int32_t maxUsers; @@ -344,7 +344,7 @@ typedef struct { char user[TSDB_USER_LEN]; char pass[TSDB_KEY_LEN]; SAcctCfg cfg; -} SCMCreateAcctMsg, SCMAlterAcctMsg; +} SCreateAcctMsg, SAlterAcctMsg; typedef struct { char user[TSDB_USER_LEN]; @@ -568,18 +568,29 @@ typedef struct { uint32_t numOfVnodes; char clusterId[TSDB_CLUSTER_ID_LEN]; char reserved[16]; -} SDMDnodeCfg; +} SDnodeCfg; + +typedef struct { + int32_t dnodeId; + uint16_t dnodePort; + char dnodeFqdn[TSDB_FQDN_LEN]; +} SDnodeEp; + +typedef struct { + int32_t dnodeNum; + SDnodeEp dnodeEps[]; +} SDnodeEps; typedef struct { - int32_t nodeId; - char nodeEp[TSDB_EP_LEN]; -} SDMMnodeInfo; + int32_t mnodeId; + char mnodeEp[TSDB_EP_LEN]; +} SMnodeInfo; typedef struct { - int8_t inUse; - int8_t nodeNum; - SDMMnodeInfo nodeInfos[TSDB_MAX_REPLICA]; -} SDMMnodeInfos; + int8_t inUse; + int8_t mnodeNum; + SMnodeInfo mnodeInfos[TSDB_MAX_REPLICA]; +} SMnodeInfos; typedef struct { int32_t numOfMnodes; // tsNumOfMnodes @@ -614,9 +625,9 @@ typedef struct { } SDMStatusMsg; typedef struct { - SDMMnodeInfos mnodes; - SDMDnodeCfg dnodeCfg; - SDMVgroupAccess vgAccess[]; + SMnodeInfos mnodes; + SDnodeCfg dnodeCfg; + SDMVgroupAccess vgAccess[]; } SDMStatusRsp; typedef struct { @@ -742,7 +753,7 @@ typedef struct { typedef struct { int32_t dnodeId; char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port - SDMMnodeInfos mnodes; + SMnodeInfos mnodes; } SMDCreateMnodeMsg; typedef struct { diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 1cd861e223..c9e8b8c87a 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -39,11 +39,15 @@ #include "mnodeCluster.h" int32_t tsAccessSquence = 0; -static void *tsDnodeSdb = NULL; +static void * tsDnodeSdb = NULL; static int32_t tsDnodeUpdateSize = 0; extern void * tsMnodeSdb; extern void * tsVgroupSdb; +static SDnodeEps*tsDnodeEps; +static int32_t tsDnodeEpsSize; +static pthread_mutex_t tsDnodeEpsMutex; + static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg); @@ -59,6 +63,7 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole); +static void mnodeUpdateDnodeEps(); static char* offlineReason[] = { "", @@ -95,6 +100,9 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) { pDnode->offlineReason = TAOS_DN_OFF_STATUS_NOT_RECEIVED; } + dnodeUpdateEp(pDnode->dnodeId, pDnode->dnodeEp, pDnode->dnodeFqdn, &pDnode->dnodePort); + mnodeUpdateDnodeEps(); + mInfo("dnode:%d, fqdn:%s ep:%s port:%d, do insert action", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp, pDnode->dnodePort); return TSDB_CODE_SUCCESS; } @@ -107,6 +115,7 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) { #endif mnodeDropMnodeLocal(pDnode->dnodeId); balanceAsyncNotify(); + mnodeUpdateDnodeEps(); mDebug("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId); return TSDB_CODE_SUCCESS; @@ -121,6 +130,7 @@ static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) { } mnodeDecDnodeRef(pDnode); + mnodeUpdateDnodeEps(); return TSDB_CODE_SUCCESS; } @@ -152,12 +162,14 @@ static int32_t mnodeDnodeActionRestored() { } } + mnodeUpdateDnodeEps(); return TSDB_CODE_SUCCESS; } int32_t mnodeInitDnodes() { SDnodeObj tObj; tsDnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + pthread_mutex_init(&tsDnodeEpsMutex, NULL); SSdbTableDesc tableDesc = { .tableId = SDB_TABLE_DNODE, @@ -201,6 +213,9 @@ int32_t mnodeInitDnodes() { void mnodeCleanupDnodes() { sdbCloseTable(tsDnodeSdb); + pthread_mutex_destroy(&tsDnodeEpsMutex); + free(tsDnodeEps); + tsDnodeEps = NULL; tsDnodeSdb = NULL; } @@ -418,6 +433,48 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) { return 0; } +static int32_t mnodeGetDnodeEpsSize() { + pthread_mutex_lock(&tsDnodeEpsMutex); + int32_t size = tsDnodeEpsSize; + pthread_mutex_unlock(&tsDnodeEpsMutex); + return size; +} + +static void mnodeGetDnodeEpsData(SDnodeEps *pEps) { + pthread_mutex_lock(&tsDnodeEpsMutex); + memcpy(pEps, tsDnodeEps, tsDnodeEpsSize); + pthread_mutex_unlock(&tsDnodeEpsMutex); +} + +static void mnodeUpdateDnodeEps() { + pthread_mutex_lock(&tsDnodeEpsMutex); + + int32_t totalDnodes = mnodeGetDnodesNum(); + tsDnodeEpsSize = sizeof(SDnodeEps) + totalDnodes * sizeof(SDnodeEp); + tsDnodeEps = calloc(1, tsDnodeEpsSize); + tsDnodeEps->dnodeNum = htonl(totalDnodes); + + SDnodeObj *pDnode = NULL; + void * pIter = NULL; + int32_t dnodesNum = 0; + + while (1) { + pIter = mnodeGetNextDnode(pIter, &pDnode); + if (pDnode == NULL) break; + if (dnodesNum >= totalDnodes) break; + + SDnodeEp *pEp = &tsDnodeEps->dnodeEps[dnodesNum]; + dnodesNum++; + pEp->dnodeId = htonl(pDnode->dnodeId); + pEp->dnodePort = htons(pDnode->dnodePort); + tstrncpy(pEp->dnodeFqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); + mnodeDecDnodeRef(pDnode); + } + + sdbFreeIter(pIter); + pthread_mutex_unlock(&tsDnodeEpsMutex); +} + static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { SDnodeObj *pDnode = NULL; SDMStatusMsg *pStatus = pMsg->rpcMsg.pCont; @@ -477,7 +534,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { } int32_t openVnodes = htons(pStatus->openVnodes); - int32_t contLen = sizeof(SDMStatusRsp) + openVnodes * sizeof(SDMVgroupAccess); + int32_t contLen = sizeof(SDMStatusRsp) + openVnodes * sizeof(SDMVgroupAccess) + mnodeGetDnodeEpsSize(); SDMStatusRsp *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { mnodeDecDnodeRef(pDnode); @@ -489,7 +546,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { pRsp->dnodeCfg.numOfVnodes = htonl(openVnodes); tstrncpy(pRsp->dnodeCfg.clusterId, mnodeGetClusterId(), TSDB_CLUSTER_ID_LEN); SDMVgroupAccess *pAccess = (SDMVgroupAccess *)((char *)pRsp + sizeof(SDMStatusRsp)); - + for (int32_t j = 0; j < openVnodes; ++j) { SVnodeLoad *pVload = &pStatus->load[j]; pVload->vgId = htonl(pVload->vgId); @@ -539,6 +596,9 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { mnodeDecDnodeRef(pDnode); + SDnodeEps *pEps = (SDnodeEps *)((char *)pAccess + openVnodes * sizeof(SDMVgroupAccess)); + mnodeGetDnodeEpsData(pEps); + pMsg->rpcRsp.len = contLen; pMsg->rpcRsp.rsp = pRsp; diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 89b2f50b73..a6f0d1f719 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -38,7 +38,7 @@ static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; static SRpcEpSet tsMnodeEpSetForShell; static SRpcEpSet tsMnodeEpSetForPeer; -static SDMMnodeInfos tsMnodeInfos; +static SMnodeInfos tsMnodeInfos; static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -70,8 +70,9 @@ static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) { pDnode->isMgmt = true; mnodeDecDnodeRef(pDnode); - - mInfo("mnode:%d, fqdn:%s ep:%s port:%d, do insert action", pMnode->mnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp, pDnode->dnodePort); + + mInfo("mnode:%d, fqdn:%s ep:%s port:%u, do insert action", pMnode->mnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp, + pDnode->dnodePort); return TSDB_CODE_SUCCESS; } @@ -202,7 +203,7 @@ void mnodeUpdateMnodeEpSet() { memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet)); memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet)); - memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos)); + memset(&tsMnodeInfos, 0, sizeof(SMnodeInfos)); int32_t index = 0; void * pIter = NULL; @@ -221,8 +222,8 @@ void mnodeUpdateMnodeEpSet() { tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index])); - tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId); - strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp); + tsMnodeInfos.mnodeInfos[index].mnodeId = htonl(pMnode->mnodeId); + strcpy(tsMnodeInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp); if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { tsMnodeEpSetForShell.inUse = index; @@ -238,7 +239,7 @@ void mnodeUpdateMnodeEpSet() { mnodeDecMnodeRef(pMnode); } - tsMnodeInfos.nodeNum = index; + tsMnodeInfos.mnodeNum = index; tsMnodeEpSetForShell.numOfEps = index; tsMnodeEpSetForPeer.numOfEps = index; @@ -260,12 +261,12 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { } char* mnodeGetMnodeMasterEp() { - return tsMnodeInfos.nodeInfos[tsMnodeInfos.inUse].nodeEp; + return tsMnodeInfos.mnodeInfos[tsMnodeInfos.inUse].mnodeEp; } void mnodeGetMnodeInfos(void *mnodeInfos) { mnodeMnodeRdLock(); - *(SDMMnodeInfos *)mnodeInfos = tsMnodeInfos; + *(SMnodeInfos *)mnodeInfos = tsMnodeInfos; mnodeMnodeUnLock(); } @@ -280,15 +281,15 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) { tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); pCreate->mnodes = tsMnodeInfos; bool found = false; - for (int i = 0; i < pCreate->mnodes.nodeNum; ++i) { - if (pCreate->mnodes.nodeInfos[i].nodeId == htonl(dnodeId)) { + for (int i = 0; i < pCreate->mnodes.mnodeNum; ++i) { + if (pCreate->mnodes.mnodeInfos[i].mnodeId == htonl(dnodeId)) { found = true; } } if (!found) { - pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeId = htonl(dnodeId); - tstrncpy(pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); - pCreate->mnodes.nodeNum++; + pCreate->mnodes.mnodeInfos[pCreate->mnodes.mnodeNum].mnodeId = htonl(dnodeId); + tstrncpy(pCreate->mnodes.mnodeInfos[pCreate->mnodes.mnodeNum].mnodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); + pCreate->mnodes.mnodeNum++; } } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 14558485aa..8fcc12ea31 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -305,7 +305,7 @@ void sdbUpdateAsync() { } void sdbUpdateSync(void *pMnodes) { - SDMMnodeInfos *mnodes = pMnodes; + SMnodeInfos *mnodes = pMnodes; if (!mnodeIsRunning()) { mDebug("mnode not start yet, update sync config later"); return; @@ -339,10 +339,10 @@ void sdbUpdateSync(void *pMnodes) { syncCfg.replica = index; mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica); } else { - for (index = 0; index < mnodes->nodeNum; ++index) { - SDMMnodeInfo *node = &mnodes->nodeInfos[index]; - syncCfg.nodeInfo[index].nodeId = node->nodeId; - taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort); + for (index = 0; index < mnodes->mnodeNum; ++index) { + SMnodeInfo *node = &mnodes->mnodeInfos[index]; + syncCfg.nodeInfo[index].nodeId = node->mnodeId; + taosGetFqdnPortFromEp(node->mnodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort); syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC; } syncCfg.replica = index; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 80909e99ae..4a77d38b71 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -283,8 +283,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { } static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { - SCMConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont; - SCMConnectRsp *pConnectRsp = NULL; + SConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont; + SConnectRsp *pConnectRsp = NULL; int32_t code = TSDB_CODE_SUCCESS; SRpcConnInfo connInfo = {0}; @@ -320,7 +320,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { mnodeDecDbRef(pDb); } - pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); + pConnectRsp = rpcMallocCont(sizeof(SConnectRsp)); if (pConnectRsp == NULL) { code = TSDB_CODE_MND_OUT_OF_MEMORY; goto connect_over; @@ -349,7 +349,7 @@ connect_over: } else { mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code)); pMsg->rpcRsp.rsp = pConnectRsp; - pMsg->rpcRsp.len = sizeof(SCMConnectRsp); + pMsg->rpcRsp.len = sizeof(SConnectRsp); } return code; diff --git a/src/vnode/inc/vnodeCfg.h b/src/vnode/inc/vnodeCfg.h new file mode 100644 index 0000000000..8acd2e4c2b --- /dev/null +++ b/src/vnode/inc/vnodeCfg.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_CFG_H +#define TDENGINE_VNODE_CFG_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t vnodeReadCfg(SVnodeObj *pVnode); +int32_t vnodeWriteCfg(SMDCreateVnodeMsg *pVnodeCfg); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/inc/vnodeVersion.h b/src/vnode/inc/vnodeVersion.h new file mode 100644 index 0000000000..1d086cb21f --- /dev/null +++ b/src/vnode/inc/vnodeVersion.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_VERSION_H +#define TDENGINE_VNODE_VERSION_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t vnodeReadVersion(SVnodeObj *pVnode); +int32_t vnodeSaveVersion(SVnodeObj *pVnode); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c new file mode 100644 index 0000000000..d758c6a2d3 --- /dev/null +++ b/src/vnode/src/vnodeCfg.c @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "taoserror.h" +#include "cJSON.h" +#include "tglobal.h" +#include "tsdb.h" +#include "dnode.h" +#include "vnodeInt.h" +#include "vnodeVersion.h" +#include "vnodeCfg.h" + +static void vnodeLoadCfg(SVnodeObj *pVnode, SMDCreateVnodeMsg* vnodeMsg) { + strcpy(pVnode->db, vnodeMsg->db); + pVnode->cfgVersion = vnodeMsg->cfg.cfgVersion; + pVnode->tsdbCfg.cacheBlockSize = vnodeMsg->cfg.cacheBlockSize; + pVnode->tsdbCfg.totalBlocks = vnodeMsg->cfg.totalBlocks; + pVnode->tsdbCfg.daysPerFile = vnodeMsg->cfg.daysPerFile; + pVnode->tsdbCfg.keep = vnodeMsg->cfg.daysToKeep; + pVnode->tsdbCfg.keep1 = vnodeMsg->cfg.daysToKeep1; + pVnode->tsdbCfg.keep2 = vnodeMsg->cfg.daysToKeep2; + pVnode->tsdbCfg.minRowsPerFileBlock = vnodeMsg->cfg.minRowsPerFileBlock; + pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock; + pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision; + pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression; + pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel; + pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod; + pVnode->walCfg.wals = vnodeMsg->cfg.wals; + pVnode->walCfg.keep = 0; + pVnode->syncCfg.replica = vnodeMsg->cfg.replications; + pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum; + + for (int i = 0; i < pVnode->syncCfg.replica; ++i) { + SMDVnodeDesc *node = &vnodeMsg->nodes[i]; + pVnode->syncCfg.nodeInfo[i].nodeId = node->nodeId; + taosGetFqdnPortFromEp(node->nodeEp, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort); + pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; + } + + vInfo("vgId:%d, load vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); + for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { + SNodeInfo *node = &pVnode->syncCfg.nodeInfo[i]; + vInfo("vgId:%d, dnode:%d, %s:%u", pVnode->vgId, node->nodeId, node->nodeFqdn, node->nodePort); + } +} + +int32_t vnodeReadCfg(SVnodeObj *pVnode) { + int32_t ret = TSDB_CODE_VND_APP_ERROR; + int32_t len = 0; + int maxLen = 1000; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; + bool nodeChanged = false; + SMDCreateVnodeMsg vnodeMsg; + + char file[TSDB_FILENAME_LEN + 30] = {0}; + sprintf(file, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId); + + fp = fopen(file, "r"); + if (!fp) { + vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId, file, strerror(errno)); + ret = TAOS_SYSTEM_ERROR(errno); + goto PARSE_VCFG_ERROR; + } + + len = fread(content, 1, maxLen, fp); + if (len <= 0) { + vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + + cJSON *db = cJSON_GetObjectItem(root, "db"); + if (!db || db->type != cJSON_String || db->valuestring == NULL) { + vError("vgId:%d, failed to read %s, db not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + strcpy(vnodeMsg.db, db->valuestring); + + cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion"); + if (!cfgVersion || cfgVersion->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.cfgVersion = cfgVersion->valueint; + + cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); + if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, cacheBlockSize not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.cacheBlockSize = cacheBlockSize->valueint; + + cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); + if (!totalBlocks || totalBlocks->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, totalBlocks not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.totalBlocks = totalBlocks->valueint; + + cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); + if (!daysPerFile || daysPerFile->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, daysPerFile not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.daysPerFile = daysPerFile->valueint; + + cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep"); + if (!daysToKeep || daysToKeep->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, daysToKeep not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.daysToKeep = daysToKeep->valueint; + + cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); + if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, daysToKeep1 not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.daysToKeep1 = daysToKeep1->valueint; + + cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); + if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, daysToKeep2 not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.daysToKeep2 = daysToKeep2->valueint; + + cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); + if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint; + + cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); + if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; + + cJSON *precision = cJSON_GetObjectItem(root, "precision"); + if (!precision || precision->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, precision not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.precision = (int8_t)precision->valueint; + + cJSON *compression = cJSON_GetObjectItem(root, "compression"); + if (!compression || compression->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, compression not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.compression = (int8_t)compression->valueint; + + cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel"); + if (!walLevel || walLevel->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, walLevel not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.walLevel = (int8_t)walLevel->valueint; + + cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync"); + if (!walLevel || walLevel->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, fsyncPeriod not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.fsyncPeriod = fsyncPeriod->valueint; + + cJSON *wals = cJSON_GetObjectItem(root, "wals"); + if (!wals || wals->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, wals not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.wals = (int8_t)wals->valueint; + + cJSON *replica = cJSON_GetObjectItem(root, "replica"); + if (!replica || replica->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, replica not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.replications = (int8_t)replica->valueint; + + cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); + if (!quorum || quorum->type != cJSON_Number) { + vError("vgId: %d, failed to read %s, quorum not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + vnodeMsg.cfg.quorum = (int8_t)quorum->valueint; + + cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + + int size = cJSON_GetArraySize(nodeInfos); + if (size != vnodeMsg.cfg.replications) { + vError("vgId:%d, failed to read %s, nodeInfos size not matched", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + + for (int i = 0; i < size; ++i) { + cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + SMDVnodeDesc *node = &vnodeMsg.nodes[i]; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, nodeId not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + node->nodeId = nodeId->valueint; + + cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); + if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { + vError("vgId:%d, failed to read %s, nodeFqdn not found", pVnode->vgId, file); + goto PARSE_VCFG_ERROR; + } + tstrncpy(node->nodeEp, nodeEp->valuestring, TSDB_EP_LEN); + + if (!nodeChanged) { + nodeChanged = dnodeCheckEpChanged(node->nodeId, node->nodeEp); + } + } + + ret = TSDB_CODE_SUCCESS; + +PARSE_VCFG_ERROR: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + if (nodeChanged) { + vnodeWriteCfg(&vnodeMsg); + } + + if (ret == TSDB_CODE_SUCCESS) { + vnodeLoadCfg(pVnode, &vnodeMsg); + } + + return ret; +} + +int32_t vnodeWriteCfg(SMDCreateVnodeMsg *pMsg) { + char file[TSDB_FILENAME_LEN + 30] = {0}; + sprintf(file, "%s/vnode%d/config.json", tsVnodeDir, pMsg->cfg.vgId); + + FILE *fp = fopen(file, "w"); + if (!fp) { + vError("vgId:%d, failed to write %s error:%s", pMsg->cfg.vgId, file, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } + + int32_t len = 0; + int32_t maxLen = 1000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pMsg->db); + len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pMsg->cfg.cfgVersion); + len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pMsg->cfg.cacheBlockSize); + len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pMsg->cfg.totalBlocks); + len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pMsg->cfg.daysPerFile); + len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pMsg->cfg.daysToKeep); + len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pMsg->cfg.daysToKeep1); + len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pMsg->cfg.daysToKeep2); + len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pMsg->cfg.minRowsPerFileBlock); + len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pMsg->cfg.maxRowsPerFileBlock); + len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pMsg->cfg.precision); + len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pMsg->cfg.compression); + len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pMsg->cfg.walLevel); + len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pMsg->cfg.fsyncPeriod); + len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications); + len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals); + len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum); + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); + for (int32_t i = 0; i < pMsg->cfg.replications; i++) { + SMDVnodeDesc *node = &pMsg->nodes[i]; + dnodeUpdateEp(node->nodeId, node->nodeEp, NULL, NULL); + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", node->nodeId); + len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", node->nodeEp); + if (i < pMsg->cfg.replications - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fflush(fp); + fclose(fp); + free(content); + + vInfo("vgId:%d, successed to write %s", pMsg->cfg.vgId, file); + return TSDB_CODE_SUCCESS; +} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index e529f27f55..fefc6612fb 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -31,15 +31,11 @@ #include "vnodeInt.h" #include "query.h" #include "dnode.h" - -#define TSDB_VNODE_VERSION_CONTENT_LEN 31 +#include "vnodeCfg.h" +#include "vnodeVersion.h" static SHashObj*tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); -static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); -static int32_t vnodeReadCfg(SVnodeObj *pVnode); -static int32_t vnodeSaveVersion(SVnodeObj *pVnode); -static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); @@ -128,7 +124,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { } } - code = vnodeSaveCfg(pVnodeCfg); + code = vnodeWriteCfg(pVnodeCfg); if (code != TSDB_CODE_SUCCESS) { vError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code)); return code; @@ -138,7 +134,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize; tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks; - // tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables; tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep; tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock; @@ -186,7 +181,7 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { return TSDB_CODE_SUCCESS; } - int32_t code = vnodeSaveCfg(pVnodeCfg); + int32_t code = vnodeWriteCfg(pVnodeCfg); if (code != TSDB_CODE_SUCCESS) { pVnode->status = TAOS_VN_STATUS_READY; return code; @@ -601,17 +596,19 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { SVnodeObj *pVnode = arg; if (status == TSDB_STATUS_COMMIT_START) { - pVnode->fversion = pVnode->version; + pVnode->fversion = pVnode->version; return walRenew(pVnode->wal); } - if (status == TSDB_STATUS_COMMIT_OVER) + if (status == TSDB_STATUS_COMMIT_OVER) { return vnodeSaveVersion(pVnode); + } - return 0; + return 0; } -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, + uint64_t *fversion) { SVnodeObj *pVnode = ahandle; *fversion = pVnode->fversion; return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); @@ -636,17 +633,18 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) { SVnodeObj *pVnode = ahandle; - if (pVnode->delay != mseconds) + if (pVnode->delay != mseconds) { vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); + } pVnode->delay = mseconds; } -static int vnodeResetTsdb(SVnodeObj *pVnode) -{ +static int vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) { + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != + TAOS_VN_STATUS_READY) { return -1; } @@ -671,7 +669,7 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); pVnode->status = TAOS_VN_STATUS_READY; - vnodeRelease(pVnode); + vnodeRelease(pVnode); return 0; } @@ -686,360 +684,3 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { return vnodeResetTsdb(pVnode); } - -static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { - char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; - sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId); - FILE *fp = fopen(cfgFile, "w"); - if (!fp) { - vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; - } - - int32_t len = 0; - int32_t maxLen = 1000; - char * content = calloc(1, maxLen + 1); - if (content == NULL) { - fclose(fp); - return TSDB_CODE_VND_OUT_OF_MEMORY; - } - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pVnodeCfg->db); - len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion); - len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize); - len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks); - // len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables); - len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile); - len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep); - len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1); - len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2); - len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); - len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); - // len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); - len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); - len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); - len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel); - len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnodeCfg->cfg.fsyncPeriod); - len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); - len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); - len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); - - len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); - - vInfo("vgId:%d, save vnode cfg, replica:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.replications); - for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { - len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId); - len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", pVnodeCfg->nodes[i].nodeEp); - vInfo("vgId:%d, save vnode cfg, nodeId:%d nodeEp:%s", pVnodeCfg->cfg.vgId, pVnodeCfg->nodes[i].nodeId, - pVnodeCfg->nodes[i].nodeEp); - - if (i < pVnodeCfg->cfg.replications - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - fflush(fp); - fclose(fp); - free(content); - - vInfo("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId); - - return TSDB_CODE_SUCCESS; -} - -static int32_t vnodeReadCfg(SVnodeObj *pVnode) { - cJSON *root = NULL; - char *content = NULL; - char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; - int maxLen = 1000; - - terrno = TSDB_CODE_VND_APP_ERROR; - sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId); - FILE *fp = fopen(cfgFile, "r"); - if (!fp) { - vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId, - cfgFile, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto PARSE_OVER; - } - - content = calloc(1, maxLen + 1); - if (content == NULL) goto PARSE_OVER; - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId); - free(content); - fclose(fp); - return errno; - } - - root = cJSON_Parse(content); - if (root == NULL) { - vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId); - goto PARSE_OVER; - } - - cJSON *db = cJSON_GetObjectItem(root, "db"); - if (!db || db->type != cJSON_String || db->valuestring == NULL) { - vError("vgId:%d, failed to read vnode cfg, db not found", pVnode->vgId); - goto PARSE_OVER; - } - strcpy(pVnode->db, db->valuestring); - - cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion"); - if (!cfgVersion || cfgVersion->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->cfgVersion = cfgVersion->valueint; - - cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); - if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.cacheBlockSize = cacheBlockSize->valueint; - - cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); - if (!totalBlocks || totalBlocks->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, totalBlocks not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint; - - // cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables"); - // if (!maxTables || maxTables->type != cJSON_Number) { - // vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId); - // goto PARSE_OVER; - // } - // pVnode->tsdbCfg.maxTables = maxTables->valueint; - - cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); - if (!daysPerFile || daysPerFile->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint; - - cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep"); - if (!daysToKeep || daysToKeep->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, daysToKeep not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.keep = daysToKeep->valueint; - - cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); - if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, daysToKeep1 not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.keep1 = daysToKeep1->valueint; - - cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); - if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, daysToKeep2 not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.keep2 = daysToKeep2->valueint; - - cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); - if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint; - - cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); - if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, maxRowsPerFileBlock not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; - - // cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime"); - // if (!commitTime || commitTime->type != cJSON_Number) { - // vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId); - // goto PARSE_OVER; - // } - // pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint; - - cJSON *precision = cJSON_GetObjectItem(root, "precision"); - if (!precision || precision->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, precision not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.precision = (int8_t)precision->valueint; - - cJSON *compression = cJSON_GetObjectItem(root, "compression"); - if (!compression || compression->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, compression not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->tsdbCfg.compression = (int8_t)compression->valueint; - - cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel"); - if (!walLevel || walLevel->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, walLevel not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->walCfg.walLevel = (int8_t) walLevel->valueint; - - cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync"); - if (!walLevel || walLevel->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, fsyncPeriod not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint; - - cJSON *wals = cJSON_GetObjectItem(root, "wals"); - if (!wals || wals->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->walCfg.wals = (int8_t)wals->valueint; - pVnode->walCfg.keep = 0; - - cJSON *replica = cJSON_GetObjectItem(root, "replica"); - if (!replica || replica->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, replica not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->syncCfg.replica = (int8_t)replica->valueint; - - cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); - if (!quorum || quorum->type != cJSON_Number) { - vError("vgId: %d, failed to read vnode cfg, quorum not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->syncCfg.quorum = (int8_t)quorum->valueint; - - cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); - if (!nodeInfos || nodeInfos->type != cJSON_Array) { - vError("vgId:%d, failed to read vnode cfg, nodeInfos not found", pVnode->vgId); - goto PARSE_OVER; - } - - int size = cJSON_GetArraySize(nodeInfos); - if (size != pVnode->syncCfg.replica) { - vError("vgId:%d, failed to read vnode cfg, nodeInfos size not matched", pVnode->vgId); - goto PARSE_OVER; - } - - for (int i = 0; i < size; ++i) { - cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); - if (nodeInfo == NULL) continue; - - cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); - if (!nodeId || nodeId->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode cfg, nodeId not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint; - - cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); - if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - vError("vgId:%d, failed to read vnode cfg, nodeFqdn not found", pVnode->vgId); - goto PARSE_OVER; - } - - taosGetFqdnPortFromEp(nodeEp->valuestring, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort); - pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; - } - - terrno = TSDB_CODE_SUCCESS; - - vInfo("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); - for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { - vInfo("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId, - pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort); - } - -PARSE_OVER: - taosTFree(content); - cJSON_Delete(root); - if (fp) fclose(fp); - return terrno; -} - -static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { - char versionFile[TSDB_FILENAME_LEN + 30] = {0}; - sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); - FILE *fp = fopen(versionFile, "w"); - if (!fp) { - vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId, - versionFile, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - - int32_t len = 0; - int32_t maxLen = 30; - char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0}; - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion); - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - fflush(fp); - fclose(fp); - - vInfo("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion); - - return TSDB_CODE_SUCCESS; -} - -static int32_t vnodeReadVersion(SVnodeObj *pVnode) { - char versionFile[TSDB_FILENAME_LEN + 30] = {0}; - char *content = NULL; - cJSON *root = NULL; - int maxLen = 100; - - terrno = TSDB_CODE_VND_INVALID_VRESION_FILE; - sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); - FILE *fp = fopen(versionFile, "r"); - if (!fp) { - if (errno != ENOENT) { - vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - } else { - terrno = TSDB_CODE_SUCCESS; - } - goto PARSE_OVER; - } - - content = calloc(1, maxLen + 1); - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId); - goto PARSE_OVER; - } - - root = cJSON_Parse(content); - if (root == NULL) { - vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId); - goto PARSE_OVER; - } - - cJSON *ver = cJSON_GetObjectItem(root, "version"); - if (!ver || ver->type != cJSON_Number) { - vError("vgId:%d, failed to read vnode version, version not found", pVnode->vgId); - goto PARSE_OVER; - } - pVnode->version = ver->valueint; - - terrno = TSDB_CODE_SUCCESS; - vInfo("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version); - -PARSE_OVER: - taosTFree(content); - cJSON_Delete(root); - if (fp) fclose(fp); - return terrno; -} diff --git a/src/vnode/src/vnodeVersion.c b/src/vnode/src/vnodeVersion.c new file mode 100644 index 0000000000..daac62f7c2 --- /dev/null +++ b/src/vnode/src/vnodeVersion.c @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "cJSON.h" +#include "tglobal.h" +#include "tsdb.h" +#include "vnodeInt.h" +#include "vnodeVersion.h" + +int32_t vnodeReadVersion(SVnodeObj *pVnode) { + int32_t ret = TSDB_CODE_SUCCESS; + int32_t len = 0; + int32_t maxLen = 100; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; + + char file[TSDB_FILENAME_LEN + 30] = {0}; + sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); + + fp = fopen(file, "r"); + if (!fp) { + if (errno != ENOENT) { + vError("vgId:%d, failed to read %s, error:%s", pVnode->vgId, file, strerror(errno)); + ret = TAOS_SYSTEM_ERROR(errno); + } else { + ret = TSDB_CODE_SUCCESS; + } + goto PARSE_VER_ERROR; + } + + fread(content, 1, maxLen, fp); + if (len <= 0) { + vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file); + goto PARSE_VER_ERROR; + } + + root = cJSON_Parse(content); + if (root == NULL) { + vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file); + goto PARSE_VER_ERROR; + } + + cJSON *ver = cJSON_GetObjectItem(root, "version"); + if (!ver || ver->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file); + goto PARSE_VER_ERROR; + } + pVnode->version = ver->valueint; + + ret = TSDB_CODE_SUCCESS; + vInfo("vgId:%d, read %s successfully, version:%" PRId64, pVnode->vgId, file, pVnode->version); + +PARSE_VER_ERROR: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + return ret; +} + +int32_t vnodeSaveVersion(SVnodeObj *pVnode) { + char file[TSDB_FILENAME_LEN + 30] = {0}; + sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); + + FILE *fp = fopen(file, "w"); + if (!fp) { + vError("vgId:%d, failed to write %s, reason:%s", pVnode->vgId, file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 100; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fflush(fp); + fclose(fp); + free(content); + + vInfo("vgId:%d, successed to write %s, version:%" PRId64, pVnode->vgId, file, pVnode->fversion); + return TSDB_CODE_SUCCESS; +} \ No newline at end of file -- GitLab