未验证 提交 0f99de15 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8526 from taosdata/feature/dnode3

rename files
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_CHECK_H_
#define _TD_DNODE_CHECK_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitCheck();
void dnodeCleanupCheck();
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_CHECK_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_CONFIG_H_
#define _TD_DNODE_CONFIG_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitConfig();
void dnodeCleanupConfig();
void dnodeUpdateCfg(SDnodeCfg *data);
void dnodeUpdateDnodeEps(SDnodeEps *data);
void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet);
int32_t dnodeGetDnodeId();
int64_t dnodeGetClusterId();
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_CONFIG_H_*/
\ No newline at end of file
...@@ -27,6 +27,20 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg); ...@@ -27,6 +27,20 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg);
void dnodeProcessStartupReq(SRpcMsg *pMsg); void dnodeProcessStartupReq(SRpcMsg *pMsg);
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
int32_t dnodeInitConfig();
void dnodeCleanupConfig();
void dnodeUpdateCfg(SDnodeCfg *data);
void dnodeUpdateDnodeEps(SDnodeEps *data);
void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet);
int32_t dnodeGetDnodeId();
int64_t dnodeGetClusterId();
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeCheck.h"
#define MIN_AVAIL_MEMORY_MB 32
static int32_t dnodeBindTcpPort(uint16_t port) {
#if 0
SOCKET serverSocket;
struct sockaddr_in server_addr;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
dError("failed to create tcp socket since %s", strerror(errno));
return -1;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
dError("failed to bind tcp port:%d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return -1;
}
if (listen(serverSocket, 5) < 0) {
dError("failed to listen tcp port:%d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return -1;
}
taosCloseSocket(serverSocket);
#endif
return 0;
}
static int32_t dnodeBindUdpPort(int16_t port) {
#if 0
SOCKET serverSocket;
struct sockaddr_in server_addr;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
dError("failed to create udp socket since %s", strerror(errno));
return -1;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
dError("failed to bind udp port:%d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return -1;
}
taosCloseSocket(serverSocket);
#endif
return 0;
}
static int32_t dnodeCheckNetwork() {
int32_t ret;
uint16_t startPort = tsServerPort;
for (uint16_t port = startPort; port < startPort + 12; port++) {
ret = dnodeBindTcpPort(port);
if (0 != ret) {
dError("failed to bind tcp port:%d", port);
return -1;
}
ret = dnodeBindUdpPort(port);
if (0 != ret) {
dError("failed to bind udp port:%d", port);
return -1;
}
}
return 0;
}
static int32_t dnodeCheckMem() {
#if 0
float memoryUsedMB;
float memoryAvailMB;
if (true != taosGetSysMemory(&memoryUsedMB)) {
dError("failed to get system memory since %s, errno:%u,", strerror(errno), errno);
return -1;
}
memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB;
if (memoryAvailMB < MIN_AVAIL_MEMORY_MB) {
dError("available memory %fMB less than the threshold %dMB", memoryAvailMB, MIN_AVAIL_MEMORY_MB);
return -1;
}
#endif
return 0;
}
static int32_t dnodeCheckDisk() {
#if 0
taosGetDisk();
if (tsAvailDataDirGB < tsMinimalDataDirGB) {
dError("dataDir disk size:%fGB less than threshold %fGB ", tsAvailDataDirGB, tsMinimalDataDirGB);
return -1;
}
if (tsAvailLogDirGB < tsMinimalLogDirGB) {
dError("logDir disk size:%fGB less than threshold %fGB", tsAvailLogDirGB, tsMinimalLogDirGB);
return -1;
}
if (tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
dError("tmpDir disk size:%fGB less than threshold %fGB", tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
return -1;
}
#endif
return 0;
}
static int32_t dnodeCheckCpu() { return 0; }
static int32_t dnodeCheckOs() { return 0; }
static int32_t dnodeCheckAccess() { return 0; }
static int32_t dnodeCheckVersion() { return 0; }
static int32_t dnodeCheckDatafile() { return 0; }
int32_t dnodeInitCheck() {
if (dnodeCheckNetwork() != 0) {
dError("failed to check network");
return -1;
}
if (dnodeCheckMem() != 0) {
dError("failed to check memory");
return -1;
}
if (dnodeCheckCpu() != 0) {
dError("failed to check cpu");
return -1;
}
if (dnodeCheckDisk() != 0) {
dError("failed to check disk");
return -1;
}
if (dnodeCheckOs() != 0) {
dError("failed to check os");
return -1;
}
if (dnodeCheckAccess() != 0) {
dError("failed to check access");
return -1;
}
if (dnodeCheckVersion() != 0) {
dError("failed to check version");
return -1;
}
if (dnodeCheckDatafile() != 0) {
dError("failed to check datafile");
return -1;
}
dInfo("dnode check is finished");
return 0;
}
void dnodeCleanupCheck() {}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeConfig.h"
#include "cJSON.h"
#include "thash.h"
static struct {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
SDnodeEps *dnodeEps;
SHashObj *dnodeHash;
SRpcEpSet mnodeEpSetForShell;
SRpcEpSet mnodeEpSetForPeer;
char file[PATH_MAX + 20];
pthread_mutex_t mutex;
} tsConfig;
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsConfig.mnodeEpSetForPeer;
pthread_mutex_unlock(&tsConfig.mutex);
}
static void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsConfig.mnodeEpSetForShell;
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeUpdateMnodeEps(SRpcEpSet *ep) {
if (ep != NULL || ep->numOfEps <= 0) {
dError("mnode is changed, but content is invalid, discard it");
return;
}
pthread_mutex_lock(&tsConfig.mutex);
dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse);
tsConfig.mnodeEpSetForPeer = *ep;
for (int32_t i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
}
tsConfig.mnodeEpSetForShell = *ep;
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) {
if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) ||
(epSet.port[i] == tsServerPort && forShell)) {
epSet.inUse = (i + 1) % epSet.numOfEps;
dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse);
}
}
epSet.port[i] = htons(epSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}
static void dnodePrintEps() {
dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum);
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode);
}
}
static void dnodeResetEps(SDnodeEps *data) {
assert(data != NULL);
int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp);
if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) {
SDnodeEps *tmp = calloc(1, size);
if (tmp == NULL) return;
tfree(tsConfig.dnodeEps);
tsConfig.dnodeEps = tmp;
}
if (tsConfig.dnodeEps != data) {
memcpy(tsConfig.dnodeEps, data, size);
}
tsConfig.mnodeEpSetForPeer.inUse = 0;
tsConfig.mnodeEpSetForShell.inUse = 0;
int32_t index = 0;
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
if (!ep->isMnode) continue;
if (index >= TSDB_MAX_REPLICA) continue;
strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn);
strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn);
tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort;
tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort;
index++;
}
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
}
dnodePrintEps();
}
static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
bool changed = false;
pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
changed = strcmp(epstr, epSaved) != 0;
tstrncpy(epstr, epSaved, TSDB_EP_LEN);
}
pthread_mutex_unlock(&tsConfig.mutex);
return changed;
}
static int32_t dnodeReadEps() {
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
fp = fopen(tsConfig.file, "r");
if (!fp) {
dDebug("file %s not exist", tsConfig.file);
goto PRASE_EPS_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", tsConfig.file);
goto PRASE_EPS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", tsConfig.file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s since dnodeId not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tsConfig.dnodeId = atoi(dnodeId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tsConfig.dropped = atoi(dropped->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tsConfig.clusterId = atoll(clusterId->valuestring);
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s since dnodeInfos not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize <= 0) {
dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize);
goto PRASE_EPS_OVER;
}
tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (tsConfig.dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_EPS_OVER;
}
tsConfig.dnodeEps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s, dnodeId not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
ep->dnodeId = atoi(dnodeId->valuestring);
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
if (!isMnode || isMnode->type != cJSON_String) {
dError("failed to read %s, isMnode not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
ep->isMnode = atoi(isMnode->valuestring);
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_String) {
dError("failed to read %s, dnodePort not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
ep->dnodePort = atoi(dnodePort->valuestring);
}
dInfo("succcessed to read file %s", tsConfig.file);
dnodePrintEps();
PRASE_EPS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) {
dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp);
return -1;
}
dnodeResetEps(tsConfig.dnodeEps);
terrno = 0;
return 0;
}
static int32_t dnodeWriteEps() {
FILE *fp = fopen(tsConfig.file, "w");
if (!fp) {
dError("failed to write %s since %s", tsConfig.file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsConfig.dnodeId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsConfig.dropped);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort);
if (i < tsConfig.dnodeEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", tsConfig.file);
return 0;
}
int32_t dnodeInitConfig() {
tsConfig.dnodeId = 0;
tsConfig.dropped = 0;
tsConfig.clusterId = 0;
tsConfig.dnodeEps = NULL;
snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir);
pthread_mutex_init(&tsConfig.mutex, NULL);
tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsConfig.dnodeHash == NULL) return -1;
int32_t ret = dnodeReadEps();
if (ret == 0) {
dInfo("dnode eps is initialized");
}
return ret;
}
void dnodeCleanupConfig() {
pthread_mutex_lock(&tsConfig.mutex);
if (tsConfig.dnodeEps != NULL) {
free(tsConfig.dnodeEps);
tsConfig.dnodeEps = NULL;
}
if (tsConfig.dnodeHash) {
taosHashCleanup(tsConfig.dnodeHash);
tsConfig.dnodeHash = NULL;
}
pthread_mutex_unlock(&tsConfig.mutex);
pthread_mutex_destroy(&tsConfig.mutex);
}
void dnodeUpdateDnodeEps(SDnodeEps *data) {
if (data == NULL || data->dnodeNum <= 0) return;
pthread_mutex_lock(&tsConfig.mutex);
if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) {
dnodeResetEps(data);
dnodeWriteEps();
} else {
int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
if (memcmp(tsConfig.dnodeEps, data, size) != 0) {
dnodeResetEps(data);
dnodeWriteEps();
}
}
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
if (port) *port = ep->dnodePort;
if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
}
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeUpdateCfg(SDnodeCfg *data) {
if (tsConfig.dnodeId != 0 && !data->dropped) return;
pthread_mutex_lock(&tsConfig.mutex);
tsConfig.dnodeId = data->dnodeId;
tsConfig.clusterId = data->clusterId;
tsConfig.dropped = data->dropped;
dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId);
dnodeWriteEps();
pthread_mutex_unlock(&tsConfig.mutex);
}
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsConfig.mutex);
dnodeId = tsConfig.dnodeId;
pthread_mutex_unlock(&tsConfig.mutex);
return dnodeId;
}
int64_t dnodeGetClusterId() {
int64_t clusterId = 0;
pthread_mutex_lock(&tsConfig.mutex);
clusterId = tsConfig.clusterId;
pthread_mutex_unlock(&tsConfig.mutex);
return clusterId;
}
...@@ -15,15 +15,411 @@ ...@@ -15,15 +15,411 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeDnode.h" #include "dnodeDnode.h"
#include "dnodeConfig.h" #include "dnodeTransport.h"
#include "mnode.h"
#include "tthread.h" #include "tthread.h"
#include "ttime.h" #include "ttime.h"
#include "vnode.h" #include "cJSON.h"
#include "thash.h"
static struct {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
SDnodeEps *dnodeEps;
SHashObj *dnodeHash;
SRpcEpSet mnodeEpSetForShell;
SRpcEpSet mnodeEpSetForPeer;
char file[PATH_MAX + 20];
pthread_mutex_t mutex;
} tsConfig;
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsConfig.mnodeEpSetForPeer;
pthread_mutex_unlock(&tsConfig.mutex);
}
static void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsConfig.mnodeEpSetForShell;
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeUpdateMnodeEps(SRpcEpSet *ep) {
if (ep != NULL || ep->numOfEps <= 0) {
dError("mnode is changed, but content is invalid, discard it");
return;
}
pthread_mutex_lock(&tsConfig.mutex);
dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse);
tsConfig.mnodeEpSetForPeer = *ep;
for (int32_t i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
}
tsConfig.mnodeEpSetForShell = *ep;
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) {
if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) ||
(epSet.port[i] == tsServerPort && forShell)) {
epSet.inUse = (i + 1) % epSet.numOfEps;
dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse);
}
}
epSet.port[i] = htons(epSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}
static void dnodePrintEps() {
dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum);
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode);
}
}
static void dnodeResetEps(SDnodeEps *data) {
assert(data != NULL);
int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp);
if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) {
SDnodeEps *tmp = calloc(1, size);
if (tmp == NULL) return;
tfree(tsConfig.dnodeEps);
tsConfig.dnodeEps = tmp;
}
if (tsConfig.dnodeEps != data) {
memcpy(tsConfig.dnodeEps, data, size);
}
tsConfig.mnodeEpSetForPeer.inUse = 0;
tsConfig.mnodeEpSetForShell.inUse = 0;
int32_t index = 0;
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
if (!ep->isMnode) continue;
if (index >= TSDB_MAX_REPLICA) continue;
strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn);
strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn);
tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort;
tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort;
index++;
}
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
}
dnodePrintEps();
}
static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
bool changed = false;
pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
changed = strcmp(epstr, epSaved) != 0;
tstrncpy(epstr, epSaved, TSDB_EP_LEN);
}
pthread_mutex_unlock(&tsConfig.mutex);
return changed;
}
static int32_t dnodeReadEps() {
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
fp = fopen(tsConfig.file, "r");
if (!fp) {
dDebug("file %s not exist", tsConfig.file);
goto PRASE_EPS_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", tsConfig.file);
goto PRASE_EPS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", tsConfig.file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s since dnodeId not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tsConfig.dnodeId = atoi(dnodeId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tsConfig.dropped = atoi(dropped->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tsConfig.clusterId = atoll(clusterId->valuestring);
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s since dnodeInfos not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize <= 0) {
dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize);
goto PRASE_EPS_OVER;
}
tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (tsConfig.dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_EPS_OVER;
}
tsConfig.dnodeEps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s, dnodeId not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
ep->dnodeId = atoi(dnodeId->valuestring);
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
if (!isMnode || isMnode->type != cJSON_String) {
dError("failed to read %s, isMnode not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
ep->isMnode = atoi(isMnode->valuestring);
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_String) {
dError("failed to read %s, dnodePort not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
ep->dnodePort = atoi(dnodePort->valuestring);
}
dInfo("succcessed to read file %s", tsConfig.file);
dnodePrintEps();
PRASE_EPS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) {
dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp);
return -1;
}
dnodeResetEps(tsConfig.dnodeEps);
terrno = 0;
return 0;
}
static int32_t dnodeWriteEps() {
FILE *fp = fopen(tsConfig.file, "w");
if (!fp) {
dError("failed to write %s since %s", tsConfig.file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsConfig.dnodeId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsConfig.dropped);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort);
if (i < tsConfig.dnodeEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", tsConfig.file);
return 0;
}
int32_t dnodeInitConfig() {
tsConfig.dnodeId = 0;
tsConfig.dropped = 0;
tsConfig.clusterId = 0;
tsConfig.dnodeEps = NULL;
snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir);
pthread_mutex_init(&tsConfig.mutex, NULL);
tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsConfig.dnodeHash == NULL) return -1;
int32_t ret = dnodeReadEps();
if (ret == 0) {
dInfo("dnode eps is initialized");
}
return ret;
}
void dnodeCleanupConfig() {
pthread_mutex_lock(&tsConfig.mutex);
if (tsConfig.dnodeEps != NULL) {
free(tsConfig.dnodeEps);
tsConfig.dnodeEps = NULL;
}
if (tsConfig.dnodeHash) {
taosHashCleanup(tsConfig.dnodeHash);
tsConfig.dnodeHash = NULL;
}
pthread_mutex_unlock(&tsConfig.mutex);
pthread_mutex_destroy(&tsConfig.mutex);
}
void dnodeUpdateDnodeEps(SDnodeEps *data) {
if (data == NULL || data->dnodeNum <= 0) return;
pthread_mutex_lock(&tsConfig.mutex);
if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) {
dnodeResetEps(data);
dnodeWriteEps();
} else {
int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
if (memcmp(tsConfig.dnodeEps, data, size) != 0) {
dnodeResetEps(data);
dnodeWriteEps();
}
}
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
if (port) *port = ep->dnodePort;
if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
}
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeUpdateCfg(SDnodeCfg *data) {
if (tsConfig.dnodeId != 0 && !data->dropped) return;
pthread_mutex_lock(&tsConfig.mutex);
tsConfig.dnodeId = data->dnodeId;
tsConfig.clusterId = data->clusterId;
tsConfig.dropped = data->dropped;
dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId);
dnodeWriteEps();
pthread_mutex_unlock(&tsConfig.mutex);
}
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsConfig.mutex);
dnodeId = tsConfig.dnodeId;
pthread_mutex_unlock(&tsConfig.mutex);
return dnodeId;
}
int64_t dnodeGetClusterId() {
int64_t clusterId = 0;
pthread_mutex_lock(&tsConfig.mutex);
clusterId = tsConfig.clusterId;
pthread_mutex_unlock(&tsConfig.mutex);
return clusterId;
}
static struct { static struct {
pthread_t *threadId; pthread_t *threadId;
bool stop; bool threadStop;
uint32_t rebootTime; uint32_t rebootTime;
} tsDnode; } tsDnode;
...@@ -93,14 +489,14 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -93,14 +489,14 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
static void *dnodeThreadRoutine(void *param) { static void *dnodeThreadRoutine(void *param) {
int32_t ms = tsStatusInterval * 1000; int32_t ms = tsStatusInterval * 1000;
while (!tsDnode.stop) { while (!tsDnode.threadStop) {
taosMsleep(ms); taosMsleep(ms);
dnodeSendStatusMsg(); dnodeSendStatusMsg();
} }
} }
int32_t dnodeInitDnode() { int32_t dnodeInitDnode() {
tsDnode.stop = false; tsDnode.threadStop = false;
tsDnode.rebootTime = taosGetTimestampSec(); tsDnode.rebootTime = taosGetTimestampSec();
tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL); tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL);
if (tsDnode.threadId == NULL) { if (tsDnode.threadId == NULL) {
...@@ -113,7 +509,7 @@ int32_t dnodeInitDnode() { ...@@ -113,7 +509,7 @@ int32_t dnodeInitDnode() {
void dnodeCleanupDnode() { void dnodeCleanupDnode() {
if (tsDnode.threadId != NULL) { if (tsDnode.threadId != NULL) {
tsDnode.stop = true; tsDnode.threadStop = true;
taosDestoryThread(tsDnode.threadId); taosDestoryThread(tsDnode.threadId);
tsDnode.threadId = NULL; tsDnode.threadId = NULL;
} }
...@@ -121,34 +517,6 @@ void dnodeCleanupDnode() { ...@@ -121,34 +517,6 @@ void dnodeCleanupDnode() {
dInfo("dnode msg is cleanuped"); dInfo("dnode msg is cleanuped");
} }
static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->mnodeNum = htonl(pCfg->mnodeNum);
for (int32_t i = 0; i < pCfg->mnodeNum; ++i) {
pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId);
pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort);
}
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (mnodeGetStatus() == MN_STATUS_READY) return 0;
return mnodeDeploy();
}
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
int32_t code = dnodeStartMnode(pMsg);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont; SCfgDnodeMsg *pCfg = pMsg->pCont;
......
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeCheck.h"
#include "dnodeConfig.h"
#include "dnodeDnode.h" #include "dnodeDnode.h"
#include "dnodeMnode.h" #include "dnodeMnode.h"
#include "dnodeTransport.h" #include "dnodeTransport.h"
...@@ -53,46 +51,6 @@ static void dnodeReportStartupFinished(char *name, char *desc) { ...@@ -53,46 +51,6 @@ static void dnodeReportStartupFinished(char *name, char *desc) {
void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsInt.startup, sizeof(SStartupStep)); } void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsInt.startup, sizeof(SStartupStep)); }
static int32_t dnodeInitMain() {
tsInt.runStatus = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
return taosCheckGlobalCfg();
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
static int32_t dnodeCheckRunning(char *dir) { static int32_t dnodeCheckRunning(char *dir) {
char filepath[256] = {0}; char filepath[256] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dir); snprintf(filepath, sizeof(filepath), "%s/.running", dir);
...@@ -140,24 +98,65 @@ static int32_t dnodeInitDir() { ...@@ -140,24 +98,65 @@ static int32_t dnodeInitDir() {
return 0; return 0;
} }
static void dnodeCleanupDir() {} static int32_t dnodeInitMain() {
tsInt.runStatus = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
if (taosCheckGlobalCfg() != 0) {
return -1;
}
dnodeInitDir();
return -1;
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
int32_t dnodeInit() { int32_t dnodeInit() {
SSteps *steps = taosStepInit(24, dnodeReportStartup); SSteps *steps = taosStepInit(24, dnodeReportStartup);
if (steps == NULL) return -1; if (steps == NULL) return -1;
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir);
taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck);
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup); taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", NULL, NULL); taosStepAdd(steps, "dnode-tfs", NULL, NULL);
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd(steps, "dnode-config", dnodeInitConfig, dnodeCleanupConfig); taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode);
taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes); taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode); taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode);
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode);
tsInt.steps = steps; tsInt.steps = steps;
taosStepExec(tsInt.steps); taosStepExec(tsInt.steps);
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeMnode.h" #include "dnodeMnode.h"
#include "dnodeConfig.h" #include "dnodeDnode.h"
#include "dnodeTransport.h" #include "dnodeTransport.h"
#include "mnode.h" #include "mnode.h"
...@@ -32,3 +32,31 @@ int32_t dnodeInitMnode() { ...@@ -32,3 +32,31 @@ int32_t dnodeInitMnode() {
} }
void dnodeCleanupMnode() { mnodeCleanup(); } void dnodeCleanupMnode() { mnodeCleanup(); }
static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->mnodeNum = htonl(pCfg->mnodeNum);
for (int32_t i = 0; i < pCfg->mnodeNum; ++i) {
pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId);
pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort);
}
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (mnodeGetStatus() == MN_STATUS_READY) return 0;
return mnodeDeploy();
}
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
int32_t code = dnodeStartMnode(pMsg);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
\ No newline at end of file
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeTransport.h" #include "dnodeTransport.h"
#include "dnodeConfig.h"
#include "dnodeDnode.h" #include "dnodeDnode.h"
#include "dnodeMnode.h" #include "dnodeMnode.h"
#include "dnodeVnodes.h" #include "dnodeVnodes.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册