diff --git a/source/dnode/mgmt/inc/dnodeCheck.h b/source/dnode/mgmt/inc/dnodeCheck.h deleted file mode 100644 index 97ac524b3f5d8cc38c7918d48a678f0cfabefbb4..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/inc/dnodeCheck.h +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DNODE_CHECK_H_ -#define _TD_DNODE_CHECK_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "dnodeInt.h" - -int32_t dnodeInitCheck(); -void dnodeCleanupCheck(); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DNODE_CHECK_H_*/ diff --git a/source/dnode/mgmt/inc/dnodeConfig.h b/source/dnode/mgmt/inc/dnodeConfig.h deleted file mode 100644 index 71f3ac3f97c940d16243079d6942ef0140aa2465..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/inc/dnodeConfig.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DNODE_CONFIG_H_ -#define _TD_DNODE_CONFIG_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "dnodeInt.h" - -int32_t dnodeInitConfig(); -void dnodeCleanupConfig(); - -void dnodeUpdateCfg(SDnodeCfg *data); -void dnodeUpdateDnodeEps(SDnodeEps *data); -void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet); -int32_t dnodeGetDnodeId(); -int64_t dnodeGetClusterId(); -void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); - -void dnodeGetEpSetForPeer(SRpcEpSet *epSet); -void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DNODE_CONFIG_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/inc/dnodeDnode.h b/source/dnode/mgmt/inc/dnodeDnode.h index b126b39d1c3738d03aaaab549397ceec1f7c1b8a..aba80752a217fce36b59710883c525d270dadb69 100644 --- a/source/dnode/mgmt/inc/dnodeDnode.h +++ b/source/dnode/mgmt/inc/dnodeDnode.h @@ -27,6 +27,20 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg); void dnodeProcessStartupReq(SRpcMsg *pMsg); void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); +int32_t dnodeInitConfig(); +void dnodeCleanupConfig(); + +void dnodeUpdateCfg(SDnodeCfg *data); +void dnodeUpdateDnodeEps(SDnodeEps *data); +void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet); +int32_t dnodeGetDnodeId(); +int64_t dnodeGetClusterId(); +void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); + +void dnodeGetEpSetForPeer(SRpcEpSet *epSet); +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); + + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/src/dnodeCheck.c b/source/dnode/mgmt/src/dnodeCheck.c deleted file mode 100644 index b7bd930bb36d84fb0c693cc74a3313c15f5b3318..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/src/dnodeCheck.c +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dnodeCheck.h" - -#define MIN_AVAIL_MEMORY_MB 32 - -static int32_t dnodeBindTcpPort(uint16_t port) { -#if 0 - SOCKET serverSocket; - struct sockaddr_in server_addr; - - if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - dError("failed to create tcp socket since %s", strerror(errno)); - return -1; - } - - bzero(&server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); - server_addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - dError("failed to bind tcp port:%d since %s", port, strerror(errno)); - taosCloseSocket(serverSocket); - return -1; - } - - if (listen(serverSocket, 5) < 0) { - dError("failed to listen tcp port:%d since %s", port, strerror(errno)); - taosCloseSocket(serverSocket); - return -1; - } - - taosCloseSocket(serverSocket); -#endif - return 0; -} - -static int32_t dnodeBindUdpPort(int16_t port) { -#if 0 - SOCKET serverSocket; - struct sockaddr_in server_addr; - - if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - dError("failed to create udp socket since %s", strerror(errno)); - return -1; - } - - bzero(&server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); - server_addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - dError("failed to bind udp port:%d since %s", port, strerror(errno)); - taosCloseSocket(serverSocket); - return -1; - } - - taosCloseSocket(serverSocket); -#endif - return 0; -} - -static int32_t dnodeCheckNetwork() { - int32_t ret; - uint16_t startPort = tsServerPort; - - for (uint16_t port = startPort; port < startPort + 12; port++) { - ret = dnodeBindTcpPort(port); - if (0 != ret) { - dError("failed to bind tcp port:%d", port); - return -1; - } - ret = dnodeBindUdpPort(port); - if (0 != ret) { - dError("failed to bind udp port:%d", port); - return -1; - } - } - - return 0; -} - -static int32_t dnodeCheckMem() { -#if 0 - float memoryUsedMB; - float memoryAvailMB; - if (true != taosGetSysMemory(&memoryUsedMB)) { - dError("failed to get system memory since %s, errno:%u,", strerror(errno), errno); - return -1; - } - - memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB; - - if (memoryAvailMB < MIN_AVAIL_MEMORY_MB) { - dError("available memory %fMB less than the threshold %dMB", memoryAvailMB, MIN_AVAIL_MEMORY_MB); - return -1; - } -#endif - return 0; -} - -static int32_t dnodeCheckDisk() { -#if 0 - taosGetDisk(); - - if (tsAvailDataDirGB < tsMinimalDataDirGB) { - dError("dataDir disk size:%fGB less than threshold %fGB ", tsAvailDataDirGB, tsMinimalDataDirGB); - return -1; - } - - if (tsAvailLogDirGB < tsMinimalLogDirGB) { - dError("logDir disk size:%fGB less than threshold %fGB", tsAvailLogDirGB, tsMinimalLogDirGB); - return -1; - } - - if (tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) { - dError("tmpDir disk size:%fGB less than threshold %fGB", tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace); - return -1; - } -#endif - return 0; -} - -static int32_t dnodeCheckCpu() { return 0; } -static int32_t dnodeCheckOs() { return 0; } -static int32_t dnodeCheckAccess() { return 0; } -static int32_t dnodeCheckVersion() { return 0; } -static int32_t dnodeCheckDatafile() { return 0; } - -int32_t dnodeInitCheck() { - if (dnodeCheckNetwork() != 0) { - dError("failed to check network"); - return -1; - } - - if (dnodeCheckMem() != 0) { - dError("failed to check memory"); - return -1; - } - - if (dnodeCheckCpu() != 0) { - dError("failed to check cpu"); - return -1; - } - - if (dnodeCheckDisk() != 0) { - dError("failed to check disk"); - return -1; - } - - if (dnodeCheckOs() != 0) { - dError("failed to check os"); - return -1; - } - - if (dnodeCheckAccess() != 0) { - dError("failed to check access"); - return -1; - } - - if (dnodeCheckVersion() != 0) { - dError("failed to check version"); - return -1; - } - - if (dnodeCheckDatafile() != 0) { - dError("failed to check datafile"); - return -1; - } - - dInfo("dnode check is finished"); - return 0; -} - -void dnodeCleanupCheck() {} \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeConfig.c b/source/dnode/mgmt/src/dnodeConfig.c deleted file mode 100644 index 2a0374ffafa994f0195c1fe7b102ba169b46d5e0..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/src/dnodeConfig.c +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dnodeConfig.h" -#include "cJSON.h" -#include "thash.h" - -static struct { - int32_t dnodeId; - int32_t dropped; - int64_t clusterId; - SDnodeEps *dnodeEps; - SHashObj *dnodeHash; - SRpcEpSet mnodeEpSetForShell; - SRpcEpSet mnodeEpSetForPeer; - char file[PATH_MAX + 20]; - pthread_mutex_t mutex; -} tsConfig; - -void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { - pthread_mutex_lock(&tsConfig.mutex); - *epSet = tsConfig.mnodeEpSetForPeer; - pthread_mutex_unlock(&tsConfig.mutex); -} - -static void dnodeGetEpSetForShell(SRpcEpSet *epSet) { - pthread_mutex_lock(&tsConfig.mutex); - *epSet = tsConfig.mnodeEpSetForShell; - pthread_mutex_unlock(&tsConfig.mutex); -} - -void dnodeUpdateMnodeEps(SRpcEpSet *ep) { - if (ep != NULL || ep->numOfEps <= 0) { - dError("mnode is changed, but content is invalid, discard it"); - return; - } - - pthread_mutex_lock(&tsConfig.mutex); - - dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse); - - tsConfig.mnodeEpSetForPeer = *ep; - for (int32_t i = 0; i < ep->numOfEps; ++i) { - ep->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); - } - tsConfig.mnodeEpSetForShell = *ep; - - pthread_mutex_unlock(&tsConfig.mutex); -} - -void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { - SRpcConnInfo connInfo = {0}; - rpcGetConnInfo(rpcMsg->handle, &connInfo); - - SRpcEpSet epSet = {0}; - if (forShell) { - dnodeGetEpSetForShell(&epSet); - } else { - dnodeGetEpSetForPeer(&epSet); - } - - dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse); - - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); - if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) { - if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) || - (epSet.port[i] == tsServerPort && forShell)) { - epSet.inUse = (i + 1) % epSet.numOfEps; - dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse); - } - } - - epSet.port[i] = htons(epSet.port[i]); - } - - rpcSendRedirectRsp(rpcMsg->handle, &epSet); -} - -static void dnodePrintEps() { - dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum); - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; - dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode); - } -} - -static void dnodeResetEps(SDnodeEps *data) { - assert(data != NULL); - - int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp); - - if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) { - SDnodeEps *tmp = calloc(1, size); - if (tmp == NULL) return; - - tfree(tsConfig.dnodeEps); - tsConfig.dnodeEps = tmp; - } - - if (tsConfig.dnodeEps != data) { - memcpy(tsConfig.dnodeEps, data, size); - } - - tsConfig.mnodeEpSetForPeer.inUse = 0; - tsConfig.mnodeEpSetForShell.inUse = 0; - int32_t index = 0; - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; - if (!ep->isMnode) continue; - if (index >= TSDB_MAX_REPLICA) continue; - strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn); - strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn); - tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort; - tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort; - index++; - } - - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; - taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); - } - - dnodePrintEps(); -} - -static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { - bool changed = false; - - pthread_mutex_lock(&tsConfig.mutex); - - SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t)); - if (ep != NULL) { - char epSaved[TSDB_EP_LEN + 1]; - snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); - changed = strcmp(epstr, epSaved) != 0; - tstrncpy(epstr, epSaved, TSDB_EP_LEN); - } - - pthread_mutex_unlock(&tsConfig.mutex); - - return changed; -} - -static int32_t dnodeReadEps() { - int32_t len = 0; - int32_t maxLen = 30000; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - FILE *fp = NULL; - - fp = fopen(tsConfig.file, "r"); - if (!fp) { - dDebug("file %s not exist", tsConfig.file); - goto PRASE_EPS_OVER; - } - - len = (int32_t)fread(content, 1, maxLen, fp); - if (len <= 0) { - dError("failed to read %s since content is null", tsConfig.file); - goto PRASE_EPS_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", tsConfig.file); - goto PRASE_EPS_OVER; - } - - cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s since dnodeId not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - tsConfig.dnodeId = atoi(dnodeId->valuestring); - - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_String) { - dError("failed to read %s since dropped not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - tsConfig.dropped = atoi(dropped->valuestring); - - cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); - if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - tsConfig.clusterId = atoll(clusterId->valuestring); - - cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); - if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - - int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); - if (dnodeInfosSize <= 0) { - dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize); - goto PRASE_EPS_OVER; - } - - tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); - if (tsConfig.dnodeEps == NULL) { - dError("failed to calloc dnodeEpList since %s", strerror(errno)); - goto PRASE_EPS_OVER; - } - tsConfig.dnodeEps->dnodeNum = dnodeInfosSize; - - for (int32_t i = 0; i < dnodeInfosSize; ++i) { - cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); - if (dnodeInfo == NULL) break; - - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; - - cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s, dnodeId not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - ep->dnodeId = atoi(dnodeId->valuestring); - - cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); - if (!isMnode || isMnode->type != cJSON_String) { - dError("failed to read %s, isMnode not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - ep->isMnode = atoi(isMnode->valuestring); - - cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); - if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); - - cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); - if (!dnodePort || dnodePort->type != cJSON_String) { - dError("failed to read %s, dnodePort not found", tsConfig.file); - goto PRASE_EPS_OVER; - } - ep->dnodePort = atoi(dnodePort->valuestring); - } - - dInfo("succcessed to read file %s", tsConfig.file); - dnodePrintEps(); - -PRASE_EPS_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (fp != NULL) fclose(fp); - - if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) { - dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp); - return -1; - } - - dnodeResetEps(tsConfig.dnodeEps); - - terrno = 0; - return 0; -} - -static int32_t dnodeWriteEps() { - FILE *fp = fopen(tsConfig.file, "w"); - if (!fp) { - dError("failed to write %s since %s", tsConfig.file, strerror(errno)); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 30000; - char *content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsConfig.dnodeId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsConfig.dropped); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId); - len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId); - len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode); - len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); - len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort); - if (i < tsConfig.dnodeEps->dnodeNum - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - taosFsyncFile(fileno(fp)); - fclose(fp); - free(content); - terrno = 0; - - dInfo("successed to write %s", tsConfig.file); - return 0; -} - -int32_t dnodeInitConfig() { - tsConfig.dnodeId = 0; - tsConfig.dropped = 0; - tsConfig.clusterId = 0; - tsConfig.dnodeEps = NULL; - snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir); - pthread_mutex_init(&tsConfig.mutex, NULL); - - tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (tsConfig.dnodeHash == NULL) return -1; - - int32_t ret = dnodeReadEps(); - if (ret == 0) { - dInfo("dnode eps is initialized"); - } - - return ret; -} - -void dnodeCleanupConfig() { - pthread_mutex_lock(&tsConfig.mutex); - - if (tsConfig.dnodeEps != NULL) { - free(tsConfig.dnodeEps); - tsConfig.dnodeEps = NULL; - } - - if (tsConfig.dnodeHash) { - taosHashCleanup(tsConfig.dnodeHash); - tsConfig.dnodeHash = NULL; - } - - pthread_mutex_unlock(&tsConfig.mutex); - pthread_mutex_destroy(&tsConfig.mutex); -} - -void dnodeUpdateDnodeEps(SDnodeEps *data) { - if (data == NULL || data->dnodeNum <= 0) return; - - pthread_mutex_lock(&tsConfig.mutex); - - if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) { - dnodeResetEps(data); - dnodeWriteEps(); - } else { - int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); - if (memcmp(tsConfig.dnodeEps, data, size) != 0) { - dnodeResetEps(data); - dnodeWriteEps(); - } - } - - pthread_mutex_unlock(&tsConfig.mutex); -} - -void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { - pthread_mutex_lock(&tsConfig.mutex); - - SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t)); - if (ep != NULL) { - if (port) *port = ep->dnodePort; - if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); - if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); - } - - pthread_mutex_unlock(&tsConfig.mutex); -} - -void dnodeUpdateCfg(SDnodeCfg *data) { - if (tsConfig.dnodeId != 0 && !data->dropped) return; - - pthread_mutex_lock(&tsConfig.mutex); - - tsConfig.dnodeId = data->dnodeId; - tsConfig.clusterId = data->clusterId; - tsConfig.dropped = data->dropped; - dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId); - - dnodeWriteEps(); - pthread_mutex_unlock(&tsConfig.mutex); -} - -int32_t dnodeGetDnodeId() { - int32_t dnodeId = 0; - pthread_mutex_lock(&tsConfig.mutex); - dnodeId = tsConfig.dnodeId; - pthread_mutex_unlock(&tsConfig.mutex); - return dnodeId; -} - -int64_t dnodeGetClusterId() { - int64_t clusterId = 0; - pthread_mutex_lock(&tsConfig.mutex); - clusterId = tsConfig.clusterId; - pthread_mutex_unlock(&tsConfig.mutex); - return clusterId; -} diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 3c6a743e05b0a7dcd62346d9961e27341e4c27e4..722ef30099beb8700af6a7029b31861164fce391 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -15,15 +15,411 @@ #define _DEFAULT_SOURCE #include "dnodeDnode.h" -#include "dnodeConfig.h" -#include "mnode.h" +#include "dnodeTransport.h" #include "tthread.h" #include "ttime.h" -#include "vnode.h" +#include "cJSON.h" +#include "thash.h" + +static struct { + int32_t dnodeId; + int32_t dropped; + int64_t clusterId; + SDnodeEps *dnodeEps; + SHashObj *dnodeHash; + SRpcEpSet mnodeEpSetForShell; + SRpcEpSet mnodeEpSetForPeer; + char file[PATH_MAX + 20]; + pthread_mutex_t mutex; +} tsConfig; + +void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsConfig.mutex); + *epSet = tsConfig.mnodeEpSetForPeer; + pthread_mutex_unlock(&tsConfig.mutex); +} + +static void dnodeGetEpSetForShell(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsConfig.mutex); + *epSet = tsConfig.mnodeEpSetForShell; + pthread_mutex_unlock(&tsConfig.mutex); +} + +void dnodeUpdateMnodeEps(SRpcEpSet *ep) { + if (ep != NULL || ep->numOfEps <= 0) { + dError("mnode is changed, but content is invalid, discard it"); + return; + } + + pthread_mutex_lock(&tsConfig.mutex); + + dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse); + + tsConfig.mnodeEpSetForPeer = *ep; + for (int32_t i = 0; i < ep->numOfEps; ++i) { + ep->port[i] -= TSDB_PORT_DNODEDNODE; + dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); + } + tsConfig.mnodeEpSetForShell = *ep; + + pthread_mutex_unlock(&tsConfig.mutex); +} + +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { + SRpcConnInfo connInfo = {0}; + rpcGetConnInfo(rpcMsg->handle, &connInfo); + + SRpcEpSet epSet = {0}; + if (forShell) { + dnodeGetEpSetForShell(&epSet); + } else { + dnodeGetEpSetForPeer(&epSet); + } + + dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse); + + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); + if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) { + if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) || + (epSet.port[i] == tsServerPort && forShell)) { + epSet.inUse = (i + 1) % epSet.numOfEps; + dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse); + } + } + + epSet.port[i] = htons(epSet.port[i]); + } + + rpcSendRedirectRsp(rpcMsg->handle, &epSet); +} + +static void dnodePrintEps() { + dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum); + for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode); + } +} + +static void dnodeResetEps(SDnodeEps *data) { + assert(data != NULL); + + int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp); + + if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) { + SDnodeEps *tmp = calloc(1, size); + if (tmp == NULL) return; + + tfree(tsConfig.dnodeEps); + tsConfig.dnodeEps = tmp; + } + + if (tsConfig.dnodeEps != data) { + memcpy(tsConfig.dnodeEps, data, size); + } + + tsConfig.mnodeEpSetForPeer.inUse = 0; + tsConfig.mnodeEpSetForShell.inUse = 0; + int32_t index = 0; + for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + if (!ep->isMnode) continue; + if (index >= TSDB_MAX_REPLICA) continue; + strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn); + strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn); + tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort; + tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort; + index++; + } + + for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); + } + + dnodePrintEps(); +} + +static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { + bool changed = false; + + pthread_mutex_lock(&tsConfig.mutex); + + SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t)); + if (ep != NULL) { + char epSaved[TSDB_EP_LEN + 1]; + snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); + changed = strcmp(epstr, epSaved) != 0; + tstrncpy(epstr, epSaved, TSDB_EP_LEN); + } + + pthread_mutex_unlock(&tsConfig.mutex); + + return changed; +} + +static int32_t dnodeReadEps() { + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; + + fp = fopen(tsConfig.file, "r"); + if (!fp) { + dDebug("file %s not exist", tsConfig.file); + goto PRASE_EPS_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", tsConfig.file); + goto PRASE_EPS_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", tsConfig.file); + goto PRASE_EPS_OVER; + } + + cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_String) { + dError("failed to read %s since dnodeId not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + tsConfig.dnodeId = atoi(dnodeId->valuestring); + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_String) { + dError("failed to read %s since dropped not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + tsConfig.dropped = atoi(dropped->valuestring); + + cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); + if (!clusterId || clusterId->type != cJSON_String) { + dError("failed to read %s since clusterId not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + tsConfig.clusterId = atoll(clusterId->valuestring); + + cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); + if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { + dError("failed to read %s since dnodeInfos not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + + int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); + if (dnodeInfosSize <= 0) { + dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize); + goto PRASE_EPS_OVER; + } + + tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + if (tsConfig.dnodeEps == NULL) { + dError("failed to calloc dnodeEpList since %s", strerror(errno)); + goto PRASE_EPS_OVER; + } + tsConfig.dnodeEps->dnodeNum = dnodeInfosSize; + + for (int32_t i = 0; i < dnodeInfosSize; ++i) { + cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); + if (dnodeInfo == NULL) break; + + SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + + cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_String) { + dError("failed to read %s, dnodeId not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + ep->dnodeId = atoi(dnodeId->valuestring); + + cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); + if (!isMnode || isMnode->type != cJSON_String) { + dError("failed to read %s, isMnode not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + ep->isMnode = atoi(isMnode->valuestring); + + cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); + if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { + dError("failed to read %s, dnodeFqdn not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); + + cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); + if (!dnodePort || dnodePort->type != cJSON_String) { + dError("failed to read %s, dnodePort not found", tsConfig.file); + goto PRASE_EPS_OVER; + } + ep->dnodePort = atoi(dnodePort->valuestring); + } + + dInfo("succcessed to read file %s", tsConfig.file); + dnodePrintEps(); + +PRASE_EPS_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) { + dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp); + return -1; + } + + dnodeResetEps(tsConfig.dnodeEps); + + terrno = 0; + return 0; +} + +static int32_t dnodeWriteEps() { + FILE *fp = fopen(tsConfig.file, "w"); + if (!fp) { + dError("failed to write %s since %s", tsConfig.file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsConfig.dnodeId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsConfig.dropped); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId); + len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); + for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId); + len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode); + len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); + len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort); + if (i < tsConfig.dnodeEps->dnodeNum - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + terrno = 0; + + dInfo("successed to write %s", tsConfig.file); + return 0; +} + +int32_t dnodeInitConfig() { + tsConfig.dnodeId = 0; + tsConfig.dropped = 0; + tsConfig.clusterId = 0; + tsConfig.dnodeEps = NULL; + snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir); + pthread_mutex_init(&tsConfig.mutex, NULL); + + tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsConfig.dnodeHash == NULL) return -1; + + int32_t ret = dnodeReadEps(); + if (ret == 0) { + dInfo("dnode eps is initialized"); + } + + return ret; +} + +void dnodeCleanupConfig() { + pthread_mutex_lock(&tsConfig.mutex); + + if (tsConfig.dnodeEps != NULL) { + free(tsConfig.dnodeEps); + tsConfig.dnodeEps = NULL; + } + + if (tsConfig.dnodeHash) { + taosHashCleanup(tsConfig.dnodeHash); + tsConfig.dnodeHash = NULL; + } + + pthread_mutex_unlock(&tsConfig.mutex); + pthread_mutex_destroy(&tsConfig.mutex); +} + +void dnodeUpdateDnodeEps(SDnodeEps *data) { + if (data == NULL || data->dnodeNum <= 0) return; + + pthread_mutex_lock(&tsConfig.mutex); + + if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) { + dnodeResetEps(data); + dnodeWriteEps(); + } else { + int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); + if (memcmp(tsConfig.dnodeEps, data, size) != 0) { + dnodeResetEps(data); + dnodeWriteEps(); + } + } + + pthread_mutex_unlock(&tsConfig.mutex); +} + +void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { + pthread_mutex_lock(&tsConfig.mutex); + + SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t)); + if (ep != NULL) { + if (port) *port = ep->dnodePort; + if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); + if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); + } + + pthread_mutex_unlock(&tsConfig.mutex); +} + +void dnodeUpdateCfg(SDnodeCfg *data) { + if (tsConfig.dnodeId != 0 && !data->dropped) return; + + pthread_mutex_lock(&tsConfig.mutex); + + tsConfig.dnodeId = data->dnodeId; + tsConfig.clusterId = data->clusterId; + tsConfig.dropped = data->dropped; + dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId); + + dnodeWriteEps(); + pthread_mutex_unlock(&tsConfig.mutex); +} + +int32_t dnodeGetDnodeId() { + int32_t dnodeId = 0; + pthread_mutex_lock(&tsConfig.mutex); + dnodeId = tsConfig.dnodeId; + pthread_mutex_unlock(&tsConfig.mutex); + return dnodeId; +} + +int64_t dnodeGetClusterId() { + int64_t clusterId = 0; + pthread_mutex_lock(&tsConfig.mutex); + clusterId = tsConfig.clusterId; + pthread_mutex_unlock(&tsConfig.mutex); + return clusterId; +} static struct { pthread_t *threadId; - bool stop; + bool threadStop; uint32_t rebootTime; } tsDnode; @@ -93,14 +489,14 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) { static void *dnodeThreadRoutine(void *param) { int32_t ms = tsStatusInterval * 1000; - while (!tsDnode.stop) { + while (!tsDnode.threadStop) { taosMsleep(ms); dnodeSendStatusMsg(); } } int32_t dnodeInitDnode() { - tsDnode.stop = false; + tsDnode.threadStop = false; tsDnode.rebootTime = taosGetTimestampSec(); tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL); if (tsDnode.threadId == NULL) { @@ -113,7 +509,7 @@ int32_t dnodeInitDnode() { void dnodeCleanupDnode() { if (tsDnode.threadId != NULL) { - tsDnode.stop = true; + tsDnode.threadStop = true; taosDestoryThread(tsDnode.threadId); tsDnode.threadId = NULL; } @@ -121,34 +517,6 @@ void dnodeCleanupDnode() { dInfo("dnode msg is cleanuped"); } -static int32_t dnodeStartMnode(SRpcMsg *pMsg) { - SCreateMnodeMsg *pCfg = pMsg->pCont; - pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->mnodeNum = htonl(pCfg->mnodeNum); - for (int32_t i = 0; i < pCfg->mnodeNum; ++i) { - pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId); - pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort); - } - - if (pCfg->dnodeId != dnodeGetDnodeId()) { - dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); - return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; - } - - if (mnodeGetStatus() == MN_STATUS_READY) return 0; - - return mnodeDeploy(); -} - -void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { - int32_t code = dnodeStartMnode(pMsg); - - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; - - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); -} - void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { SCfgDnodeMsg *pCfg = pMsg->pCont; diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c index df84896be724271f225d14cd172d9407bcc209c4..a1e7bc1c7e25ade263bec0147f4d9902fa25d2fa 100644 --- a/source/dnode/mgmt/src/dnodeInt.c +++ b/source/dnode/mgmt/src/dnodeInt.c @@ -14,8 +14,6 @@ */ #define _DEFAULT_SOURCE -#include "dnodeCheck.h" -#include "dnodeConfig.h" #include "dnodeDnode.h" #include "dnodeMnode.h" #include "dnodeTransport.h" @@ -53,46 +51,6 @@ static void dnodeReportStartupFinished(char *name, char *desc) { void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsInt.startup, sizeof(SStartupStep)); } -static int32_t dnodeInitMain() { - tsInt.runStatus = DN_RUN_STAT_STOPPED; - tscEmbedded = 1; - taosIgnSIGPIPE(); - taosBlockSIGPIPE(); - taosResolveCRC(); - taosInitGlobalCfg(); - taosReadGlobalLogCfg(); - taosSetCoreDump(tsEnableCoreFile); - - if (!taosMkDir(tsLogDir)) { - printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); - return -1; - } - - char temp[TSDB_FILENAME_LEN]; - sprintf(temp, "%s/taosdlog", tsLogDir); - if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { - printf("failed to init log file\n"); - } - - if (!taosReadGlobalCfg()) { - taosPrintGlobalCfg(); - dError("TDengine read global config failed"); - return -1; - } - - dInfo("start to initialize TDengine"); - - taosInitNotes(); - - return taosCheckGlobalCfg(); -} - -static void dnodeCleanupMain() { - taos_cleanup(); - taosCloseLog(); - taosStopCacheRefreshWorker(); -} - static int32_t dnodeCheckRunning(char *dir) { char filepath[256] = {0}; snprintf(filepath, sizeof(filepath), "%s/.running", dir); @@ -140,24 +98,65 @@ static int32_t dnodeInitDir() { return 0; } -static void dnodeCleanupDir() {} +static int32_t dnodeInitMain() { + tsInt.runStatus = DN_RUN_STAT_STOPPED; + tscEmbedded = 1; + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + taosInitGlobalCfg(); + taosReadGlobalLogCfg(); + taosSetCoreDump(tsEnableCoreFile); + + if (!taosMkDir(tsLogDir)) { + printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); + return -1; + } + + char temp[TSDB_FILENAME_LEN]; + sprintf(temp, "%s/taosdlog", tsLogDir); + if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { + printf("failed to init log file\n"); + } + + if (!taosReadGlobalCfg()) { + taosPrintGlobalCfg(); + dError("TDengine read global config failed"); + return -1; + } + + dInfo("start to initialize TDengine"); + + taosInitNotes(); + + if (taosCheckGlobalCfg() != 0) { + return -1; + } + + dnodeInitDir(); + + return -1; +} + +static void dnodeCleanupMain() { + taos_cleanup(); + taosCloseLog(); + taosStopCacheRefreshWorker(); +} int32_t dnodeInit() { SSteps *steps = taosStepInit(24, dnodeReportStartup); if (steps == NULL) return -1; taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); - taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir); - taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck); taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup); taosStepAdd(steps, "dnode-tfs", NULL, NULL); taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); - taosStepAdd(steps, "dnode-config", dnodeInitConfig, dnodeCleanupConfig); + taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode); taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes); taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode); taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); - taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode); tsInt.steps = steps; taosStepExec(tsInt.steps); diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index 47b1d89bb80a3913b2017d8123f5fc32b956ed39..6019fbd1f6b8d5af73595bc310e02c8db5a12a12 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "dnodeMnode.h" -#include "dnodeConfig.h" +#include "dnodeDnode.h" #include "dnodeTransport.h" #include "mnode.h" @@ -31,4 +31,32 @@ int32_t dnodeInitMnode() { return mnodeInit(para); } -void dnodeCleanupMnode() { mnodeCleanup(); } \ No newline at end of file +void dnodeCleanupMnode() { mnodeCleanup(); } + +static int32_t dnodeStartMnode(SRpcMsg *pMsg) { + SCreateMnodeMsg *pCfg = pMsg->pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->mnodeNum = htonl(pCfg->mnodeNum); + for (int32_t i = 0; i < pCfg->mnodeNum; ++i) { + pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId); + pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort); + } + + if (pCfg->dnodeId != dnodeGetDnodeId()) { + dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); + return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; + } + + if (mnodeGetStatus() == MN_STATUS_READY) return 0; + + return mnodeDeploy(); +} + +void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { + int32_t code = dnodeStartMnode(pMsg); + + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index 68ec0a44e58997bfe64f92dc3e6df43c052a5005..39747d4350a90254b41b96a87ab21b9eb239116c 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -21,7 +21,6 @@ #define _DEFAULT_SOURCE #include "dnodeTransport.h" -#include "dnodeConfig.h" #include "dnodeDnode.h" #include "dnodeMnode.h" #include "dnodeVnodes.h"