提交 2861c026 编写于 作者: S Shengliang Guan

TD-1762

上级 0e6c237d
......@@ -490,7 +490,7 @@ static bool balanceMontiorDropping() {
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
if (pDnode->lastAccess + tsOfflineThreshold > tsAccessSquence) continue;
if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) continue;
if (dnodeIsMasterEp(pDnode->dnodeEp)) continue;
if (mnodeGetDnodesNum() <= 1) continue;
mLInfo("dnode:%d, set to removing state for it offline:%d seconds", pDnode->dnodeId,
......
......@@ -922,13 +922,13 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCMCreateAcctMsg);
pCmd->payloadLen = sizeof(SCreateAcctMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SCMCreateAcctMsg *pAlterMsg = (SCMCreateAcctMsg *)pCmd->payload;
SCreateAcctMsg *pAlterMsg = (SCreateAcctMsg *)pCmd->payload;
SStrToken *pName = &pInfo->pDCLInfo->user.user;
SStrToken *pPwd = &pInfo->pDCLInfo->user.passwd;
......@@ -1461,14 +1461,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CM_CONNECT;
pCmd->payloadLen = sizeof(SCMConnectMsg);
pCmd->payloadLen = sizeof(SConnectMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload;
// TODO refactor full_name
char *db; // ugly code to move the space
......@@ -1987,7 +1987,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res;
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
SConnectRsp *pConnect = (SConnectRsp *)pRes->pRsp;
tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_CFG_H
#define TDENGINE_DNODE_CFG_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitCfg();
void dnodeCleanupCfg();
void dnodeUpdateCfg(SDnodeCfg *cfg);
int32_t dnodeGetDnodeId();
void dnodeGetCfg(int32_t *dnodeId, char *clusterId);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_EP_H
#define TDENGINE_DNODE_EP_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
int32_t dnodeInitEps();
void dnodeCleanupEps();
void dnodeUpdateEps(SDnodeEps *eps);
void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MINFOS_H
#define TDENGINE_DNODE_MINFOS_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
int32_t dnodeInitMInfos();
void dnodeCleanupMInfos();
void dnodeUpdateMInfos(SMnodeInfos *minfos);
void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetMInfos(SMnodeInfos *minfos);
bool dnodeIsMasterEp(char *ep);
#ifdef __cplusplus
}
#endif
#endif
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include "trpc.h"
int32_t dnodeInitMgmt();
void dnodeCleanupMgmt();
int32_t dnodeInitMgmtTimer();
......@@ -35,8 +37,8 @@ void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeCfg.h"
static SDnodeCfg tsCfg = {0};
static pthread_mutex_t tsCfgMutex;
static int32_t dnodeReadCfg();
static int32_t dnodeWriteCfg();
static void dnodeResetCfg(SDnodeCfg *cfg);
static void dnodePrintCfg(SDnodeCfg *cfg);
int32_t dnodeInitCfg() {
pthread_mutex_init(&tsCfgMutex, NULL);
dnodeResetCfg(NULL);
return dnodeReadCfg();
}
void dnodeCleanupCfg() { pthread_mutex_destroy(&tsCfgMutex); }
void dnodeUpdateCfg(SDnodeCfg *cfg) {
if (tsCfg.dnodeId != 0) return;
dnodeResetCfg(cfg);
}
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsCfgMutex);
dnodeId = tsCfg.dnodeId;
pthread_mutex_unlock(&tsCfgMutex);
return dnodeId;
}
void dnodeGetCfg(int32_t *dnodeId, char *clusterId) {
pthread_mutex_lock(&tsCfgMutex);
*dnodeId = tsCfg.dnodeId;
tstrncpy(clusterId, tsCfg.clusterId, TSDB_CLUSTER_ID_LEN);
pthread_mutex_unlock(&tsCfgMutex);
}
static void dnodeResetCfg(SDnodeCfg *cfg) {
if (cfg == NULL) return;
if (cfg->dnodeId == 0) return;
pthread_mutex_lock(&tsCfgMutex);
tsCfg.dnodeId = cfg->dnodeId;
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodePrintCfg(cfg);
dnodeWriteCfg();
pthread_mutex_unlock(&tsCfgMutex);
}
static void dnodePrintCfg(SDnodeCfg *cfg) {
dInfo("dnodeId is set to %d, clusterId is set to %s", cfg->dnodeId, cfg->clusterId);
}
static int32_t dnodeReadCfg() {
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SDnodeCfg cfg = {0};
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeCfg.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PARSE_CFG_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PARSE_CFG_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PARSE_CFG_OVER;
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", file);
goto PARSE_CFG_OVER;
}
cfg.dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s, clusterId not found", file);
goto PARSE_CFG_OVER;
}
tstrncpy(cfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN);
dInfo("read file %s successed", file);
PARSE_CFG_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
dnodeResetCfg(&cfg);
return 0;
}
static int32_t dnodeWriteCfg() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
dInfo("successed to write %s", file);
return 0;
}
......@@ -15,9 +15,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnodeInt.h"
#include "dnodeCheck.h"
......@@ -30,8 +28,8 @@ typedef struct {
void (*stopFp)();
} SCheckItem;
static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}};
int64_t tsMinFreeMemSizeForStart = 0;
static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}};
int64_t tsMinFreeMemSizeForStart = 0;
static int bindTcpPort(int port) {
int serverSocket;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "hash.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeEps.h"
static SDnodeEps *tsEps = NULL;
static SHashObj * tsEpsHash = NULL;
static pthread_mutex_t tsEpsMutex;
static int32_t dnodeReadEps();
static int32_t dnodeWriteEps();
static void dnodeResetEps(SDnodeEps *eps);
static void dnodePrintEps(SDnodeEps *eps);
int32_t dnodeInitEps() {
pthread_mutex_init(&tsEpsMutex, NULL);
tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
dnodeResetEps(NULL);
return dnodeReadEps();
}
void dnodeCleanupEps() {
pthread_mutex_lock(&tsEpsMutex);
if (tsEps) {
free(tsEps);
tsEps = NULL;
}
if (tsEpsHash) {
taosHashCleanup(tsEpsHash);
tsEpsHash = NULL;
}
pthread_mutex_unlock(&tsEpsMutex);
pthread_mutex_destroy(&tsEpsMutex);
}
void dnodeUpdateEps(SDnodeEps *eps) {
if (eps == NULL) return;
eps->dnodeNum = htonl(eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; ++i) {
eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId);
eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort);
}
pthread_mutex_lock(&tsEpsMutex);
if (eps->dnodeNum != tsEps->dnodeNum) {
dnodeResetEps(eps);
dnodeWriteEps();
} else {
int32_t size = sizeof(SDnodeEps) + eps->dnodeNum * sizeof(SDnodeEp);
if (memcmp(eps, tsEps, size) != 0) {
dnodeResetEps(eps);
dnodeWriteEps();
}
}
pthread_mutex_unlock(&tsEpsMutex);
}
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr) {
bool changed = false;
pthread_mutex_lock(&tsEpsMutex);
SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
changed = strcmp(epstr, epSaved) != 0;
}
pthread_mutex_unlock(&tsEpsMutex);
return changed;
}
void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsEpsMutex);
SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
if (port) *port = ep->dnodePort;
if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
}
pthread_mutex_unlock(&tsEpsMutex);
}
static void dnodeResetEps(SDnodeEps *eps) {
if (eps == NULL) {
int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp);
if (tsEps == NULL) {
tsEps = calloc(1, size);
} else {
tsEps->dnodeNum = 0;
}
} else {
assert(tsEps);
int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp) * eps->dnodeNum;
if (eps->dnodeNum > tsEps->dnodeNum) {
tsEps = realloc(tsEps, size);
}
memcpy(tsEps, eps, size);
dnodePrintEps(eps);
}
for (int32_t i = 0; i < tsEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps->dnodeEps[i];
taosHashPut(tsEpsHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
}
}
static void dnodePrintEps(SDnodeEps *eps) {
dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; i++) {
SDnodeEp *ep = &eps->dnodeEps[i];
dDebug("dnodeId:%d, dnodeFqdn:%s dnodePort:%u", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort);
}
}
static int32_t dnodeReadEps() {
int32_t ret = -1;
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SDnodeEps *eps = NULL;
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeEps.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PRASE_EPS_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PRASE_EPS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeNum = cJSON_GetObjectItem(root, "dnodeNum");
if (!dnodeNum || dnodeNum->type != cJSON_Number) {
dError("failed to read %s, dnodeNum not found", file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s, dnodeInfos not found", file);
goto PRASE_EPS_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize != dnodeNum->valueint) {
dError("failed to read %s, dnodeInfos size:%d not matched dnodeNum:%d", file, dnodeInfosSize,
(int32_t)dnodeNum->valueint);
goto PRASE_EPS_OVER;
}
int32_t epsSize = sizeof(SDnodeEps) + dnodeInfosSize * sizeof(SDnodeEp);
eps = calloc(1, epsSize);
eps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
SDnodeEp *ep = &eps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", file);
goto PRASE_EPS_OVER;
}
ep->dnodeId = dnodeId->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", file);
goto PRASE_EPS_OVER;
}
strncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s, dnodePort not found", file);
goto PRASE_EPS_OVER;
}
ep->dnodePort = (uint16_t)dnodePort->valueint;
}
ret = 0;
dInfo("read file %s successed", file);
dnodePrintEps(eps);
PRASE_EPS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (ret != 0) {
if (eps) free(eps);
eps = NULL;
}
dnodeResetEps(eps);
if (eps) free(eps);
return 0;
}
static int32_t dnodeWriteEps() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeEps.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", tsEps->dnodeNum);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": %u\n", ep->dnodePort);
if (i < tsEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
dInfo("successed to write %s", file);
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMInfos.h"
static SMnodeInfos tsMInfos;
static SRpcEpSet tsMEpSet;
static pthread_mutex_t tsMInfosMutex;
static void dnodeResetMInfos(SMnodeInfos *minfos);
static void dnodePrintMInfos(SMnodeInfos *minfos);
static int32_t dnodeReadMInfos();
static int32_t dnodeWriteMInfos();
int32_t dnodeInitMInfos() {
pthread_mutex_init(&tsMInfosMutex, NULL);
dnodeResetMInfos(NULL);
return dnodeReadMInfos();
}
void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); }
void dnodeUpdateMInfos(SMnodeInfos *minfos) {
if (minfos->mnodeNum <= 0 || minfos->mnodeNum > 3) {
dError("invalid mnode infos, mnodeNum:%d", minfos->mnodeNum);
return;
}
for (int32_t i = 0; i < minfos->mnodeNum; ++i) {
SMnodeInfo *minfo = &minfos->mnodeInfos[i];
minfo->mnodeId = htonl(minfo->mnodeId);
if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) {
dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp);
return;
}
}
pthread_mutex_lock(&tsMInfosMutex);
if (minfos->mnodeNum != tsMInfos.mnodeNum) {
dnodeResetMInfos(minfos);
dnodeWriteMInfos();
sdbUpdateAsync();
} else {
int32_t size = sizeof(SMnodeInfos);
if (memcmp(minfos, &tsMInfos, size) != 0) {
dnodeResetMInfos(minfos);
dnodeWriteMInfos();
sdbUpdateAsync();
}
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) {
if (ep->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
return;
}
pthread_mutex_lock(&tsMInfosMutex);
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
for (int i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
}
tsMEpSet = *ep;
pthread_mutex_unlock(&tsMInfosMutex);
}
bool dnodeIsMasterEp(char *ep) {
pthread_mutex_lock(&tsMInfosMutex);
bool isMaster = strcmp(ep, tsMInfos.mnodeInfos[tsMEpSet.inUse].mnodeEp) == 0;
pthread_mutex_unlock(&tsMInfosMutex);
return isMaster;
}
void dnodeGetMInfos(SMnodeInfos *minfos) {
pthread_mutex_lock(&tsMInfosMutex);
memcpy(minfos, &tsMInfos, sizeof(SMnodeInfos));
for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) {
minfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId);
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsMInfosMutex);
*epSet = tsMEpSet;
for (int i = 0; i < epSet->numOfEps; ++i) {
epSet->port[i] += TSDB_PORT_DNODEDNODE;
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsMInfosMutex);
*epSet = tsMEpSet;
pthread_mutex_unlock(&tsMInfosMutex);
}
static void dnodePrintMInfos(SMnodeInfos *minfos) {
dInfo("print mnode infos, mnodeNum:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse);
for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) {
dInfo("mnode index:%d, %s", tsMInfos.mnodeInfos[i].mnodeId, tsMInfos.mnodeInfos[i].mnodeEp);
}
}
static void dnodeResetMInfos(SMnodeInfos *minfos) {
if (minfos == NULL) {
tsMEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]);
if (strcmp(tsSecond, tsFirst) != 0) {
tsMEpSet.numOfEps = 2;
taosGetFqdnPortFromEp(tsSecond, tsMEpSet.fqdn[1], &tsMEpSet.port[1]);
}
return;
}
if (minfos->mnodeNum == 0) return;
int32_t size = sizeof(SMnodeInfos);
memcpy(&tsMInfos, minfos, size);
tsMEpSet.inUse = tsMInfos.inUse;
tsMEpSet.numOfEps = tsMInfos.mnodeNum;
for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) {
taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]);
}
dnodePrintMInfos(minfos);
}
static int32_t dnodeReadMInfos() {
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SMnodeInfos minfos = {0};
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PARSE_MINFOS_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PARSE_MINFOS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PARSE_MINFOS_OVER;
}
cJSON *inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, inUse not found");
goto PARSE_MINFOS_OVER;
}
tsMInfos.inUse = inUse->valueint;
cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeNum not found");
goto PARSE_MINFOS_OVER;
}
minfos.mnodeNum = nodeNum->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnodeEpSet.json, nodeInfos not found");
goto PARSE_MINFOS_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != minfos.mnodeNum) {
dError("failed to read mnodeEpSet.json, nodeInfos size not matched");
goto PARSE_MINFOS_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeId not found");
goto PARSE_MINFOS_OVER;
}
minfos.mnodeInfos[i].mnodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnodeEpSet.json, nodeName not found");
goto PARSE_MINFOS_OVER;
}
strncpy(minfos.mnodeInfos[i].mnodeEp, nodeEp->valuestring, TSDB_EP_LEN);
}
dInfo("read file %s successed", file);
dnodePrintMInfos(&minfos);
PARSE_MINFOS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
dnodeResetMInfos(&minfos);
return 0;
}
static int32_t dnodeWriteMInfos() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMInfos.mnodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMInfos.mnodeInfos[i].mnodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMInfos.mnodeInfos[i].mnodeEp);
if (i < tsMInfos.mnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
dInfo("successed to write %s", file);
return 0;
}
......@@ -24,6 +24,9 @@
#include "dnodeMgmt.h"
#include "dnodePeer.h"
#include "dnodeModule.h"
#include "dnodeEps.h"
#include "dnodeMInfos.h"
#include "dnodeCfg.h"
#include "dnodeCheck.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
......@@ -33,23 +36,27 @@
#include "dnodeShell.h"
#include "dnodeTelemetry.h"
static int32_t dnodeInitStorage();
static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitStorage();
static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir);
static int32_t dnodeInitComponents();
static void dnodeCleanupComponents(int32_t stepId);
static int dnodeCreateDir(const char *dir);
static void dnodeCleanupComponents(int32_t stepId);
static int dnodeCreateDir(const char *dir);
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
int32_t (*init)();
void (*cleanup)();
} SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"eps", dnodeInitEps, dnodeCleanupEps},
{"minfos", dnodeInitMInfos, dnodeCleanupMInfos},
{"cfg", dnodeInitCfg, dnodeCleanupCfg},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
......
......@@ -31,12 +31,13 @@
#include "mnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeEps.h"
#include "dnodeCfg.h"
#include "dnodeMInfos.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeModule.h"
#define MPEER_CONTENT_LEN 2000
typedef struct {
pthread_t thread;
int32_t threadIndex;
......@@ -46,23 +47,13 @@ typedef struct {
int32_t * vnodeList;
} SOpenVnodeThread;
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static SRpcEpSet tsDMnodeEpSet = {0};
static SDMMnodeInfos tsDMnodeInfos = {0};
static SDMDnodeCfg tsDnodeCfg = {0};
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
static bool dnodeReadMnodeInfos();
static void dnodeSaveMnodeInfos();
static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg);
static bool dnodeReadDnodeCfg();
static void dnodeSaveDnodeCfg();
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void *dnodeProcessMgmtQueue(void *param);
......@@ -74,7 +65,7 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() {
......@@ -86,27 +77,8 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
dnodeReadDnodeCfg();
tsRebootTime = taosGetTimestampSec();
if (!dnodeReadMnodeInfos()) {
memset(&tsDMnodeEpSet, 0, sizeof(SRpcEpSet));
memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos));
tsDMnodeEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, tsDMnodeEpSet.fqdn[0], &tsDMnodeEpSet.port[0]);
if (strcmp(tsSecond, tsFirst) != 0) {
tsDMnodeEpSet.numOfEps = 2;
taosGetFqdnPortFromEp(tsSecond, tsDMnodeEpSet.fqdn[1], &tsDMnodeEpSet.port[1]);
}
} else {
tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse;
tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]);
}
}
int32_t code = vnodeInitResources();
if (code != TSDB_CODE_SUCCESS) {
......@@ -470,10 +442,10 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum);
for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) {
pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp);
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum);
for (int i = 0; i < pCfg->mnodes.mnodeNum; ++i) {
pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
}
dnodeStartMnode(&pCfg->mnodes);
......@@ -481,34 +453,6 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (pEpSet->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
return;
}
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
}
tsDMnodeEpSet = *pEpSet;
}
void dnodeGetMnodeEpSetForPeer(void *epSetRaw) {
SRpcEpSet *epSet = epSetRaw;
*epSet = tsDMnodeEpSet;
for (int i=0; i<epSet->numOfEps; ++i)
epSet->port[i] += TSDB_PORT_DNODEDNODE;
}
void dnodeGetMnodeEpSetForShell(void *epSetRaw) {
SRpcEpSet *epSet = epSetRaw;
*epSet = tsDMnodeEpSet;
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
......@@ -517,201 +461,23 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
}
SDMStatusRsp *pStatusRsp = pMsg->pCont;
SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes;
if (pMnodes->nodeNum <= 0) {
dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SMnodeInfos *minfos = &pStatusRsp->mnodes;
dnodeUpdateMInfos(minfos);
SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->moduleStatus = htonl(pCfg->moduleStatus);
pCfg->dnodeId = htonl(pCfg->dnodeId);
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
}
dnodeUpdateCfg(pCfg);
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
// will not set mnode in status msg
// dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg);
SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SDMVgroupAccess));
dnodeUpdateEps(pEps);
dnodeUpdateMnodeInfos(pMnodes);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}
static bool dnodeCheckMnodeInfos(SDMMnodeInfos *pMnodes) {
if (pMnodes->nodeNum <= 0 || pMnodes->nodeNum > 3) {
dError("invalid mnode infos, num:%d", pMnodes->nodeNum);
return false;
}
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
if (pMnodeInfo->nodeId <= 0 || strlen(pMnodeInfo->nodeEp) <= 5) {
dError("invalid mnode info:%d, nodeId:%d nodeEp:%s", i, pMnodeInfo->nodeId, pMnodeInfo->nodeEp);
return false;
}
}
return true;
}
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
bool mnodesChanged = (memcmp(&tsDMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0);
bool mnodesNotInit = (tsDMnodeInfos.nodeNum == 0);
if (!(mnodesChanged || mnodesNotInit)) return;
if (!dnodeCheckMnodeInfos(pMnodes)) return;
memcpy(&tsDMnodeInfos, pMnodes, sizeof(SDMMnodeInfos));
dInfo("mnode infos is changed, nodeNum:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse);
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
dInfo("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
}
tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse;
tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]);
}
dnodeSaveMnodeInfos();
sdbUpdateAsync();
}
static bool dnodeReadMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN*2] = {0};
sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r");
if (!fp) {
dDebug("failed to read mnodeEpSet.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 2000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read mnodeEpSet.json, content is null");
return false;
}
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read mnodeEpSet.json, invalid json format");
goto PARSE_OVER;
}
cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, inUse not found");
goto PARSE_OVER;
}
tsDMnodeInfos.inUse = inUse->valueint;
cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeNum not found");
goto PARSE_OVER;
}
tsDMnodeInfos.nodeNum = nodeNum->valueint;
cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnodeEpSet.json, nodeInfos not found");
goto PARSE_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != tsDMnodeInfos.nodeNum) {
dError("failed to read mnodeEpSet.json, nodeInfos size not matched");
goto PARSE_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeId not found");
goto PARSE_OVER;
}
tsDMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnodeEpSet.json, nodeName not found");
goto PARSE_OVER;
}
strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
}
ret = true;
dInfo("read mnode epSet successed, numOfEps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse);
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
dInfo("mnode:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
}
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDMnodeInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDMnodeInfos.nodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDMnodeInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDMnodeInfos.nodeInfos[i].nodeEp);
if (i < tsDMnodeInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
dInfo("save mnode epSet successed");
}
char *dnodeGetMnodeMasterEp() {
return tsDMnodeInfos.nodeInfos[tsDMnodeEpSet.inUse].nodeEp;
}
void* dnodeGetMnodeInfos() {
return &tsDMnodeInfos;
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
......@@ -732,14 +498,13 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return;
}
//strcpy(pStatus->dnodeName, tsDnodeName);
dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
pStatus->dnodeId = htonl(dnodeGetDnodeId());
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
tstrncpy(pStatus->clusterId, tsDnodeCfg.clusterId, TSDB_CLUSTER_ID_LEN);
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
// fill cluster cfg parameters
......@@ -769,110 +534,19 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
};
SRpcEpSet epSet;
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeGetEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, &rpcMsg);
}
static bool dnodeReadDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN*2] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "r");
if (!fp) {
dDebug("failed to read dnodeCfg.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 100;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read dnodeCfg.json, content is null");
return false;
}
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read dnodeCfg.json, invalid json format");
goto PARSE_CFG_OVER;
}
cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read dnodeCfg.json, dnodeId not found");
goto PARSE_CFG_OVER;
}
tsDnodeCfg.dnodeId = dnodeId->valueint;
cJSON* clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read dnodeCfg.json, clusterId not found");
goto PARSE_CFG_OVER;
}
tstrncpy(tsDnodeCfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN);
ret = true;
dInfo("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId);
PARSE_CFG_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsDnodeCfg.dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsDnodeCfg.clusterId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
dInfo("save dnodeId successed");
}
void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
if (tsDnodeCfg.dnodeId == 0) {
dInfo("dnodeId is set to %d, clusterId is set to %s", pCfg->dnodeId, pCfg->clusterId);
tsDnodeCfg.dnodeId = pCfg->dnodeId;
tstrncpy(tsDnodeCfg.clusterId, pCfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodeSaveDnodeCfg();
}
}
int32_t dnodeGetDnodeId() {
return tsDnodeCfg.dnodeId;
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetMnodeEpSetForShell(&epSet);
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
......
......@@ -146,8 +146,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
}
}
bool dnodeStartMnode(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
bool dnodeStartMnode(SMnodeInfos *minfos) {
SMnodeInfos *mnodes = minfos;
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
......
......@@ -28,8 +28,8 @@
#include "dnodeMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h"
extern void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet);
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
......@@ -151,7 +151,7 @@ void dnodeCleanupClient() {
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEpSetForPeer(pEpSet);
dnodeUpdateEpSetForPeer(pEpSet);
}
if (dnodeProcessRspMsgFp[pMsg->msgType]) {
......@@ -173,7 +173,7 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeGetEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
}
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "trpc.h"
#include "taosmsg.h"
typedef struct {
int32_t queryReqNum;
......@@ -38,12 +39,13 @@ SDnodeRunStatus dnodeGetRunStatus();
SDnodeStatisInfo dnodeGetStatisInfo();
bool dnodeIsFirstDeploy();
char * dnodeGetMnodeMasterEp();
void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos();
bool dnodeIsMasterEp(char *ep);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
int32_t dnodeGetDnodeId();
bool dnodeStartMnode(void *pModes);
void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr);
bool dnodeStartMnode(SMnodeInfos *minfos);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
......
......@@ -308,12 +308,12 @@ typedef struct {
} SUpdateTableTagValMsg;
typedef struct {
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN];
char appName[TSDB_APPNAME_LEN];
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN];
char appName[TSDB_APPNAME_LEN];
int32_t pid;
} SCMConnectMsg;
} SConnectMsg;
typedef struct {
char acctId[TSDB_ACCT_LEN];
......@@ -324,7 +324,7 @@ typedef struct {
int8_t reserved2;
int32_t connId;
SRpcEpSet epSet;
} SCMConnectRsp;
} SConnectRsp;
typedef struct {
int32_t maxUsers;
......@@ -344,7 +344,7 @@ typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
SAcctCfg cfg;
} SCMCreateAcctMsg, SCMAlterAcctMsg;
} SCreateAcctMsg, SAlterAcctMsg;
typedef struct {
char user[TSDB_USER_LEN];
......@@ -568,18 +568,29 @@ typedef struct {
uint32_t numOfVnodes;
char clusterId[TSDB_CLUSTER_ID_LEN];
char reserved[16];
} SDMDnodeCfg;
} SDnodeCfg;
typedef struct {
int32_t dnodeId;
uint16_t dnodePort;
char dnodeFqdn[TSDB_FQDN_LEN];
} SDnodeEp;
typedef struct {
int32_t dnodeNum;
SDnodeEp dnodeEps[];
} SDnodeEps;
typedef struct {
int32_t nodeId;
char nodeEp[TSDB_EP_LEN];
} SDMMnodeInfo;
int32_t mnodeId;
char mnodeEp[TSDB_EP_LEN];
} SMnodeInfo;
typedef struct {
int8_t inUse;
int8_t nodeNum;
SDMMnodeInfo nodeInfos[TSDB_MAX_REPLICA];
} SDMMnodeInfos;
int8_t inUse;
int8_t mnodeNum;
SMnodeInfo mnodeInfos[TSDB_MAX_REPLICA];
} SMnodeInfos;
typedef struct {
int32_t numOfMnodes; // tsNumOfMnodes
......@@ -614,9 +625,9 @@ typedef struct {
} SDMStatusMsg;
typedef struct {
SDMMnodeInfos mnodes;
SDMDnodeCfg dnodeCfg;
SDMVgroupAccess vgAccess[];
SMnodeInfos mnodes;
SDnodeCfg dnodeCfg;
SDMVgroupAccess vgAccess[];
} SDMStatusRsp;
typedef struct {
......@@ -742,7 +753,7 @@ typedef struct {
typedef struct {
int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
SDMMnodeInfos mnodes;
SMnodeInfos mnodes;
} SMDCreateMnodeMsg;
typedef struct {
......
......@@ -39,11 +39,15 @@
#include "mnodeCluster.h"
int32_t tsAccessSquence = 0;
static void *tsDnodeSdb = NULL;
static void * tsDnodeSdb = NULL;
static int32_t tsDnodeUpdateSize = 0;
extern void * tsMnodeSdb;
extern void * tsVgroupSdb;
static SDnodeEps*tsDnodeEps;
static int32_t tsDnodeEpsSize;
static pthread_mutex_t tsDnodeEpsMutex;
static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg);
......@@ -59,6 +63,7 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole);
static void mnodeUpdateDnodeEps();
static char* offlineReason[] = {
"",
......@@ -95,6 +100,9 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
pDnode->offlineReason = TAOS_DN_OFF_STATUS_NOT_RECEIVED;
}
dnodeUpdateEp(pDnode->dnodeId, pDnode->dnodeEp, pDnode->dnodeFqdn, &pDnode->dnodePort);
mnodeUpdateDnodeEps();
mInfo("dnode:%d, fqdn:%s ep:%s port:%d, do insert action", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp, pDnode->dnodePort);
return TSDB_CODE_SUCCESS;
}
......@@ -107,6 +115,7 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
#endif
mnodeDropMnodeLocal(pDnode->dnodeId);
balanceAsyncNotify();
mnodeUpdateDnodeEps();
mDebug("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId);
return TSDB_CODE_SUCCESS;
......@@ -121,6 +130,7 @@ static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
}
mnodeDecDnodeRef(pDnode);
mnodeUpdateDnodeEps();
return TSDB_CODE_SUCCESS;
}
......@@ -152,12 +162,14 @@ static int32_t mnodeDnodeActionRestored() {
}
}
mnodeUpdateDnodeEps();
return TSDB_CODE_SUCCESS;
}
int32_t mnodeInitDnodes() {
SDnodeObj tObj;
tsDnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
pthread_mutex_init(&tsDnodeEpsMutex, NULL);
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_DNODE,
......@@ -201,6 +213,9 @@ int32_t mnodeInitDnodes() {
void mnodeCleanupDnodes() {
sdbCloseTable(tsDnodeSdb);
pthread_mutex_destroy(&tsDnodeEpsMutex);
free(tsDnodeEps);
tsDnodeEps = NULL;
tsDnodeSdb = NULL;
}
......@@ -418,6 +433,48 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
return 0;
}
static int32_t mnodeGetDnodeEpsSize() {
pthread_mutex_lock(&tsDnodeEpsMutex);
int32_t size = tsDnodeEpsSize;
pthread_mutex_unlock(&tsDnodeEpsMutex);
return size;
}
static void mnodeGetDnodeEpsData(SDnodeEps *pEps) {
pthread_mutex_lock(&tsDnodeEpsMutex);
memcpy(pEps, tsDnodeEps, tsDnodeEpsSize);
pthread_mutex_unlock(&tsDnodeEpsMutex);
}
static void mnodeUpdateDnodeEps() {
pthread_mutex_lock(&tsDnodeEpsMutex);
int32_t totalDnodes = mnodeGetDnodesNum();
tsDnodeEpsSize = sizeof(SDnodeEps) + totalDnodes * sizeof(SDnodeEp);
tsDnodeEps = calloc(1, tsDnodeEpsSize);
tsDnodeEps->dnodeNum = htonl(totalDnodes);
SDnodeObj *pDnode = NULL;
void * pIter = NULL;
int32_t dnodesNum = 0;
while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (dnodesNum >= totalDnodes) break;
SDnodeEp *pEp = &tsDnodeEps->dnodeEps[dnodesNum];
dnodesNum++;
pEp->dnodeId = htonl(pDnode->dnodeId);
pEp->dnodePort = htons(pDnode->dnodePort);
tstrncpy(pEp->dnodeFqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
mnodeDecDnodeRef(pDnode);
}
sdbFreeIter(pIter);
pthread_mutex_unlock(&tsDnodeEpsMutex);
}
static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
SDnodeObj *pDnode = NULL;
SDMStatusMsg *pStatus = pMsg->rpcMsg.pCont;
......@@ -477,7 +534,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
}
int32_t openVnodes = htons(pStatus->openVnodes);
int32_t contLen = sizeof(SDMStatusRsp) + openVnodes * sizeof(SDMVgroupAccess);
int32_t contLen = sizeof(SDMStatusRsp) + openVnodes * sizeof(SDMVgroupAccess) + mnodeGetDnodeEpsSize();
SDMStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mnodeDecDnodeRef(pDnode);
......@@ -489,7 +546,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
pRsp->dnodeCfg.numOfVnodes = htonl(openVnodes);
tstrncpy(pRsp->dnodeCfg.clusterId, mnodeGetClusterId(), TSDB_CLUSTER_ID_LEN);
SDMVgroupAccess *pAccess = (SDMVgroupAccess *)((char *)pRsp + sizeof(SDMStatusRsp));
for (int32_t j = 0; j < openVnodes; ++j) {
SVnodeLoad *pVload = &pStatus->load[j];
pVload->vgId = htonl(pVload->vgId);
......@@ -539,6 +596,9 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
mnodeDecDnodeRef(pDnode);
SDnodeEps *pEps = (SDnodeEps *)((char *)pAccess + openVnodes * sizeof(SDMVgroupAccess));
mnodeGetDnodeEpsData(pEps);
pMsg->rpcRsp.len = contLen;
pMsg->rpcRsp.rsp = pRsp;
......
......@@ -38,7 +38,7 @@ static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0;
static SRpcEpSet tsMnodeEpSetForShell;
static SRpcEpSet tsMnodeEpSetForPeer;
static SDMMnodeInfos tsMnodeInfos;
static SMnodeInfos tsMnodeInfos;
static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -70,8 +70,9 @@ static int32_t mnodeMnodeActionInsert(SSdbOper *pOper) {
pDnode->isMgmt = true;
mnodeDecDnodeRef(pDnode);
mInfo("mnode:%d, fqdn:%s ep:%s port:%d, do insert action", pMnode->mnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp, pDnode->dnodePort);
mInfo("mnode:%d, fqdn:%s ep:%s port:%u, do insert action", pMnode->mnodeId, pDnode->dnodeFqdn, pDnode->dnodeEp,
pDnode->dnodePort);
return TSDB_CODE_SUCCESS;
}
......@@ -202,7 +203,7 @@ void mnodeUpdateMnodeEpSet() {
memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet));
memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet));
memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
memset(&tsMnodeInfos, 0, sizeof(SMnodeInfos));
int32_t index = 0;
void * pIter = NULL;
......@@ -221,8 +222,8 @@ void mnodeUpdateMnodeEpSet() {
tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE);
mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index]));
tsMnodeInfos.nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
strcpy(tsMnodeInfos.nodeInfos[index].nodeEp, pDnode->dnodeEp);
tsMnodeInfos.mnodeInfos[index].mnodeId = htonl(pMnode->mnodeId);
strcpy(tsMnodeInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp);
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
tsMnodeEpSetForShell.inUse = index;
......@@ -238,7 +239,7 @@ void mnodeUpdateMnodeEpSet() {
mnodeDecMnodeRef(pMnode);
}
tsMnodeInfos.nodeNum = index;
tsMnodeInfos.mnodeNum = index;
tsMnodeEpSetForShell.numOfEps = index;
tsMnodeEpSetForPeer.numOfEps = index;
......@@ -260,12 +261,12 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) {
}
char* mnodeGetMnodeMasterEp() {
return tsMnodeInfos.nodeInfos[tsMnodeInfos.inUse].nodeEp;
return tsMnodeInfos.mnodeInfos[tsMnodeInfos.inUse].mnodeEp;
}
void mnodeGetMnodeInfos(void *mnodeInfos) {
mnodeMnodeRdLock();
*(SDMMnodeInfos *)mnodeInfos = tsMnodeInfos;
*(SMnodeInfos *)mnodeInfos = tsMnodeInfos;
mnodeMnodeUnLock();
}
......@@ -280,15 +281,15 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes = tsMnodeInfos;
bool found = false;
for (int i = 0; i < pCreate->mnodes.nodeNum; ++i) {
if (pCreate->mnodes.nodeInfos[i].nodeId == htonl(dnodeId)) {
for (int i = 0; i < pCreate->mnodes.mnodeNum; ++i) {
if (pCreate->mnodes.mnodeInfos[i].mnodeId == htonl(dnodeId)) {
found = true;
}
}
if (!found) {
pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeId = htonl(dnodeId);
tstrncpy(pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes.nodeNum++;
pCreate->mnodes.mnodeInfos[pCreate->mnodes.mnodeNum].mnodeId = htonl(dnodeId);
tstrncpy(pCreate->mnodes.mnodeInfos[pCreate->mnodes.mnodeNum].mnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes.mnodeNum++;
}
}
......
......@@ -305,7 +305,7 @@ void sdbUpdateAsync() {
}
void sdbUpdateSync(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
SMnodeInfos *mnodes = pMnodes;
if (!mnodeIsRunning()) {
mDebug("mnode not start yet, update sync config later");
return;
......@@ -339,10 +339,10 @@ void sdbUpdateSync(void *pMnodes) {
syncCfg.replica = index;
mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica);
} else {
for (index = 0; index < mnodes->nodeNum; ++index) {
SDMMnodeInfo *node = &mnodes->nodeInfos[index];
syncCfg.nodeInfo[index].nodeId = node->nodeId;
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort);
for (index = 0; index < mnodes->mnodeNum; ++index) {
SMnodeInfo *node = &mnodes->mnodeInfos[index];
syncCfg.nodeInfo[index].nodeId = node->mnodeId;
taosGetFqdnPortFromEp(node->mnodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort);
syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC;
}
syncCfg.replica = index;
......
......@@ -283,8 +283,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
SCMConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont;
SCMConnectRsp *pConnectRsp = NULL;
SConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont;
SConnectRsp *pConnectRsp = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SRpcConnInfo connInfo = {0};
......@@ -320,7 +320,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
mnodeDecDbRef(pDb);
}
pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp));
pConnectRsp = rpcMallocCont(sizeof(SConnectRsp));
if (pConnectRsp == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto connect_over;
......@@ -349,7 +349,7 @@ connect_over:
} else {
mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code));
pMsg->rpcRsp.rsp = pConnectRsp;
pMsg->rpcRsp.len = sizeof(SCMConnectRsp);
pMsg->rpcRsp.len = sizeof(SConnectRsp);
}
return code;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_CFG_H
#define TDENGINE_VNODE_CFG_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t vnodeReadCfg(SVnodeObj *pVnode);
int32_t vnodeWriteCfg(SMDCreateVnodeMsg *pVnodeCfg);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_VERSION_H
#define TDENGINE_VNODE_VERSION_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t vnodeReadVersion(SVnodeObj *pVnode);
int32_t vnodeSaveVersion(SVnodeObj *pVnode);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "cJSON.h"
#include "tglobal.h"
#include "tsdb.h"
#include "dnode.h"
#include "vnodeInt.h"
#include "vnodeVersion.h"
#include "vnodeCfg.h"
static void vnodeLoadCfg(SVnodeObj *pVnode, SMDCreateVnodeMsg* vnodeMsg) {
strcpy(pVnode->db, vnodeMsg->db);
pVnode->cfgVersion = vnodeMsg->cfg.cfgVersion;
pVnode->tsdbCfg.cacheBlockSize = vnodeMsg->cfg.cacheBlockSize;
pVnode->tsdbCfg.totalBlocks = vnodeMsg->cfg.totalBlocks;
pVnode->tsdbCfg.daysPerFile = vnodeMsg->cfg.daysPerFile;
pVnode->tsdbCfg.keep = vnodeMsg->cfg.daysToKeep;
pVnode->tsdbCfg.keep1 = vnodeMsg->cfg.daysToKeep1;
pVnode->tsdbCfg.keep2 = vnodeMsg->cfg.daysToKeep2;
pVnode->tsdbCfg.minRowsPerFileBlock = vnodeMsg->cfg.minRowsPerFileBlock;
pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock;
pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision;
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
pVnode->walCfg.wals = vnodeMsg->cfg.wals;
pVnode->walCfg.keep = 0;
pVnode->syncCfg.replica = vnodeMsg->cfg.replications;
pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum;
for (int i = 0; i < pVnode->syncCfg.replica; ++i) {
SMDVnodeDesc *node = &vnodeMsg->nodes[i];
pVnode->syncCfg.nodeInfo[i].nodeId = node->nodeId;
taosGetFqdnPortFromEp(node->nodeEp, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort);
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
}
vInfo("vgId:%d, load vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
SNodeInfo *node = &pVnode->syncCfg.nodeInfo[i];
vInfo("vgId:%d, dnode:%d, %s:%u", pVnode->vgId, node->nodeId, node->nodeFqdn, node->nodePort);
}
}
int32_t vnodeReadCfg(SVnodeObj *pVnode) {
int32_t ret = TSDB_CODE_VND_APP_ERROR;
int32_t len = 0;
int maxLen = 1000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
bool nodeChanged = false;
SMDCreateVnodeMsg vnodeMsg;
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf(file, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
fp = fopen(file, "r");
if (!fp) {
vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId, file, strerror(errno));
ret = TAOS_SYSTEM_ERROR(errno);
goto PARSE_VCFG_ERROR;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
cJSON *db = cJSON_GetObjectItem(root, "db");
if (!db || db->type != cJSON_String || db->valuestring == NULL) {
vError("vgId:%d, failed to read %s, db not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
strcpy(vnodeMsg.db, db->valuestring);
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!cfgVersion || cfgVersion->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.cfgVersion = cfgVersion->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, cacheBlockSize not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.cacheBlockSize = cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, totalBlocks not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.totalBlocks = totalBlocks->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysPerFile not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.daysPerFile = daysPerFile->valueint;
cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
if (!daysToKeep || daysToKeep->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.daysToKeep = daysToKeep->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep1 not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.daysToKeep1 = daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep2 not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.daysToKeep2 = daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, precision not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.precision = (int8_t)precision->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression");
if (!compression || compression->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, compression not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.compression = (int8_t)compression->valueint;
cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, walLevel not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.walLevel = (int8_t)walLevel->valueint;
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, fsyncPeriod not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.fsyncPeriod = fsyncPeriod->valueint;
cJSON *wals = cJSON_GetObjectItem(root, "wals");
if (!wals || wals->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, wals not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.wals = (int8_t)wals->valueint;
cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, replica not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.replications = (int8_t)replica->valueint;
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, quorum not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.quorum = (int8_t)quorum->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != vnodeMsg.cfg.replications) {
vError("vgId:%d, failed to read %s, nodeInfos size not matched", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
SMDVnodeDesc *node = &vnodeMsg.nodes[i];
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, nodeId not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
node->nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
vError("vgId:%d, failed to read %s, nodeFqdn not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
tstrncpy(node->nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
if (!nodeChanged) {
nodeChanged = dnodeCheckEpChanged(node->nodeId, node->nodeEp);
}
}
ret = TSDB_CODE_SUCCESS;
PARSE_VCFG_ERROR:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (nodeChanged) {
vnodeWriteCfg(&vnodeMsg);
}
if (ret == TSDB_CODE_SUCCESS) {
vnodeLoadCfg(pVnode, &vnodeMsg);
}
return ret;
}
int32_t vnodeWriteCfg(SMDCreateVnodeMsg *pMsg) {
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf(file, "%s/vnode%d/config.json", tsVnodeDir, pMsg->cfg.vgId);
FILE *fp = fopen(file, "w");
if (!fp) {
vError("vgId:%d, failed to write %s error:%s", pMsg->cfg.vgId, file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
int32_t len = 0;
int32_t maxLen = 1000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pMsg->db);
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pMsg->cfg.cfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pMsg->cfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pMsg->cfg.totalBlocks);
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pMsg->cfg.daysPerFile);
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pMsg->cfg.daysToKeep);
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pMsg->cfg.daysToKeep1);
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pMsg->cfg.daysToKeep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pMsg->cfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pMsg->cfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pMsg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pMsg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pMsg->cfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pMsg->cfg.fsyncPeriod);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pMsg->cfg.replications; i++) {
SMDVnodeDesc *node = &pMsg->nodes[i];
dnodeUpdateEp(node->nodeId, node->nodeEp, NULL, NULL);
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", node->nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", node->nodeEp);
if (i < pMsg->cfg.replications - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
vInfo("vgId:%d, successed to write %s", pMsg->cfg.vgId, file);
return TSDB_CODE_SUCCESS;
}
......@@ -31,15 +31,11 @@
#include "vnodeInt.h"
#include "query.h"
#include "dnode.h"
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
#include "vnodeCfg.h"
#include "vnodeVersion.h"
static SHashObj*tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
......@@ -128,7 +124,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
}
code = vnodeSaveCfg(pVnodeCfg);
code = vnodeWriteCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
return code;
......@@ -138,7 +134,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize;
tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks;
// tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
......@@ -186,7 +181,7 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
return TSDB_CODE_SUCCESS;
}
int32_t code = vnodeSaveCfg(pVnodeCfg);
int32_t code = vnodeWriteCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
......@@ -601,17 +596,19 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
SVnodeObj *pVnode = arg;
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->fversion = pVnode->version;
pVnode->fversion = pVnode->version;
return walRenew(pVnode->wal);
}
if (status == TSDB_STATUS_COMMIT_OVER)
if (status == TSDB_STATUS_COMMIT_OVER) {
return vnodeSaveVersion(pVnode);
}
return 0;
return 0;
}
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size,
uint64_t *fversion) {
SVnodeObj *pVnode = ahandle;
*fversion = pVnode->fversion;
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
......@@ -636,17 +633,18 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) {
SVnodeObj *pVnode = ahandle;
if (pVnode->delay != mseconds)
if (pVnode->delay != mseconds) {
vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
}
pVnode->delay = mseconds;
}
static int vnodeResetTsdb(SVnodeObj *pVnode)
{
static int vnodeResetTsdb(SVnodeObj *pVnode) {
char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) {
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) !=
TAOS_VN_STATUS_READY) {
return -1;
}
......@@ -671,7 +669,7 @@ static int vnodeResetTsdb(SVnodeObj *pVnode)
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
pVnode->status = TAOS_VN_STATUS_READY;
vnodeRelease(pVnode);
vnodeRelease(pVnode);
return 0;
}
......@@ -686,360 +684,3 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
return vnodeResetTsdb(pVnode);
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId);
FILE *fp = fopen(cfgFile, "w");
if (!fp) {
vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
int32_t len = 0;
int32_t maxLen = 1000;
char * content = calloc(1, maxLen + 1);
if (content == NULL) {
fclose(fp);
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pVnodeCfg->db);
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
// len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1);
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
// len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnodeCfg->cfg.fsyncPeriod);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
vInfo("vgId:%d, save vnode cfg, replica:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.replications);
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", pVnodeCfg->nodes[i].nodeEp);
vInfo("vgId:%d, save vnode cfg, nodeId:%d nodeEp:%s", pVnodeCfg->cfg.vgId, pVnodeCfg->nodes[i].nodeId,
pVnodeCfg->nodes[i].nodeEp);
if (i < pVnodeCfg->cfg.replications - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
vInfo("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
cJSON *root = NULL;
char *content = NULL;
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
int maxLen = 1000;
terrno = TSDB_CODE_VND_APP_ERROR;
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(cfgFile, "r");
if (!fp) {
vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId,
cfgFile, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto PARSE_OVER;
}
content = calloc(1, maxLen + 1);
if (content == NULL) goto PARSE_OVER;
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
free(content);
fclose(fp);
return errno;
}
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId);
goto PARSE_OVER;
}
cJSON *db = cJSON_GetObjectItem(root, "db");
if (!db || db->type != cJSON_String || db->valuestring == NULL) {
vError("vgId:%d, failed to read vnode cfg, db not found", pVnode->vgId);
goto PARSE_OVER;
}
strcpy(pVnode->db, db->valuestring);
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!cfgVersion || cfgVersion->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->cfgVersion = cfgVersion->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.cacheBlockSize = cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, totalBlocks not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
// cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
// if (!maxTables || maxTables->type != cJSON_Number) {
// vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
// goto PARSE_OVER;
// }
// pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;
cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
if (!daysToKeep || daysToKeep->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, daysToKeep not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep = daysToKeep->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, daysToKeep1 not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep1 = daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, daysToKeep2 not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep2 = daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, maxRowsPerFileBlock not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
// cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
// if (!commitTime || commitTime->type != cJSON_Number) {
// vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
// goto PARSE_OVER;
// }
// pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, precision not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.precision = (int8_t)precision->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression");
if (!compression || compression->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, compression not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, walLevel not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
if (!walLevel || walLevel->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, fsyncPeriod not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint;
cJSON *wals = cJSON_GetObjectItem(root, "wals");
if (!wals || wals->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->walCfg.wals = (int8_t)wals->valueint;
pVnode->walCfg.keep = 0;
cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, replica not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.replica = (int8_t)replica->valueint;
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) {
vError("vgId: %d, failed to read vnode cfg, quorum not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
vError("vgId:%d, failed to read vnode cfg, nodeInfos not found", pVnode->vgId);
goto PARSE_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != pVnode->syncCfg.replica) {
vError("vgId:%d, failed to read vnode cfg, nodeInfos size not matched", pVnode->vgId);
goto PARSE_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, nodeId not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
vError("vgId:%d, failed to read vnode cfg, nodeFqdn not found", pVnode->vgId);
goto PARSE_OVER;
}
taosGetFqdnPortFromEp(nodeEp->valuestring, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort);
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
}
terrno = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
vInfo("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
}
PARSE_OVER:
taosTFree(content);
cJSON_Delete(root);
if (fp) fclose(fp);
return terrno;
}
static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(versionFile, "w");
if (!fp) {
vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId,
versionFile, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
int32_t len = 0;
int32_t maxLen = 30;
char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0};
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
vInfo("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
char *content = NULL;
cJSON *root = NULL;
int maxLen = 100;
terrno = TSDB_CODE_VND_INVALID_VRESION_FILE;
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(versionFile, "r");
if (!fp) {
if (errno != ENOENT) {
vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_SUCCESS;
}
goto PARSE_OVER;
}
content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
goto PARSE_OVER;
}
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId);
goto PARSE_OVER;
}
cJSON *ver = cJSON_GetObjectItem(root, "version");
if (!ver || ver->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode version, version not found", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->version = ver->valueint;
terrno = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version);
PARSE_OVER:
taosTFree(content);
cJSON_Delete(root);
if (fp) fclose(fp);
return terrno;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "cJSON.h"
#include "tglobal.h"
#include "tsdb.h"
#include "vnodeInt.h"
#include "vnodeVersion.h"
int32_t vnodeReadVersion(SVnodeObj *pVnode) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t len = 0;
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
fp = fopen(file, "r");
if (!fp) {
if (errno != ENOENT) {
vError("vgId:%d, failed to read %s, error:%s", pVnode->vgId, file, strerror(errno));
ret = TAOS_SYSTEM_ERROR(errno);
} else {
ret = TSDB_CODE_SUCCESS;
}
goto PARSE_VER_ERROR;
}
fread(content, 1, maxLen, fp);
if (len <= 0) {
vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file);
goto PARSE_VER_ERROR;
}
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file);
goto PARSE_VER_ERROR;
}
cJSON *ver = cJSON_GetObjectItem(root, "version");
if (!ver || ver->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file);
goto PARSE_VER_ERROR;
}
pVnode->version = ver->valueint;
ret = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, read %s successfully, version:%" PRId64, pVnode->vgId, file, pVnode->version);
PARSE_VER_ERROR:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
return ret;
}
int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
char file[TSDB_FILENAME_LEN + 30] = {0};
sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(file, "w");
if (!fp) {
vError("vgId:%d, failed to write %s, reason:%s", pVnode->vgId, file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
vInfo("vgId:%d, successed to write %s, version:%" PRId64, pVnode->vgId, file, pVnode->fversion);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册