/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "dmMgmt.h" #define STR_CASE_CMP(s, d) (0 == strcasecmp((s), (d))) #define STR_STR_CMP(s, d) (strstr((s), (d))) #define STR_INT_CMP(s, d, c) (taosStr2Int32(s, 0, 10) c(d)) #define STR_STR_SIGN ("ia") #define DM_INIT_MON() \ do { \ code = (int32_t)(2147483648 | 298); \ strncpy(stName, tsVersionName, 64); \ monCfg.maxLogs = tsMonitorMaxLogs; \ monCfg.port = tsMonitorPort; \ monCfg.server = tsMonitorFqdn; \ monCfg.comp = tsMonitorComp; \ if (monInit(&monCfg) != 0) { \ if (terrno != 0) code = terrno; \ goto _exit; \ } \ } while (0) #define DM_ERR_RTN(c) \ do { \ code = (c); \ goto _exit; \ } while (0) static SDnode globalDnode = {0}; static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS", "FreeBSD", "openSUSE", "SLES", "Fedora", "macOS"}; SDnode *dmInstance() { return &globalDnode; } static int32_t dmCheckRepeatInit(SDnode *pDnode) { if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { dError("env is already initialized"); terrno = TSDB_CODE_REPEAT_INIT; return -1; } return 0; } static int32_t dmInitSystem() { taosIgnSIGPIPE(); taosBlockSIGPIPE(); taosResolveCRC(); return 0; } static int32_t dmInitMonitor() { int32_t code = 0; SMonCfg monCfg = {0}; char reName[64] = {0}; char stName[64] = {0}; char ver[64] = {0}; DM_INIT_MON(); if (STR_STR_CMP(stName, STR_STR_SIGN)) { DM_ERR_RTN(0); } if (taosGetOsReleaseName(reName, stName, ver, 64) != 0) { DM_ERR_RTN(code); } if (STR_CASE_CMP(stName, dmOS[0])) { if (STR_INT_CMP(ver, 17, >)) { DM_ERR_RTN(0); } } else if (STR_CASE_CMP(stName, dmOS[1])) { if (STR_INT_CMP(ver, 6, >)) { DM_ERR_RTN(0); } } else if (STR_STR_CMP(stName, dmOS[2]) || STR_STR_CMP(stName, dmOS[3]) || STR_STR_CMP(stName, dmOS[4]) || STR_STR_CMP(stName, dmOS[5]) || STR_STR_CMP(stName, dmOS[6]) || STR_STR_CMP(stName, dmOS[7]) || STR_STR_CMP(stName, dmOS[8]) || STR_STR_CMP(stName, dmOS[9])) { DM_ERR_RTN(0); } _exit: if (code) terrno = code; return code; } static bool dmCheckDiskSpace() { osUpdate(); // sufficiency if (!osDataSpaceSufficient()) { dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0); } if (!osLogSpaceSufficient()) { dWarn("free log disk size: %f GB, not sufficient, expected %f GB at least", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0); } if (!osTempSpaceSufficient()) { dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0); } // availability bool ret = true; if (!osDataSpaceAvailable()) { dError("data disk space unavailable, i.e. %s", tsDataDir); terrno = TSDB_CODE_NO_DISKSPACE; ret = false; } if (!osLogSpaceAvailable()) { dError("log disk space unavailable, i.e. %s", tsLogDir); terrno = TSDB_CODE_NO_DISKSPACE; ret = false; } if (!osTempSpaceAvailable()) { dError("temp disk space unavailable, i.e. %s", tsTempDir); terrno = TSDB_CODE_NO_DISKSPACE; ret = false; } return ret; } static bool dmCheckDataDirVersion() { char checkDataDirJsonFileName[PATH_MAX] = {0}; snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir); if (taosCheckExistFile(checkDataDirJsonFileName)) { dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!", tsDataDir); return false; } return true; } int32_t dmInit() { dInfo("start to init dnode env"); if (!dmCheckDataDirVersion()) return -1; if (!dmCheckDiskSpace()) return -1; if (dmCheckRepeatInit(dmInstance()) != 0) return -1; if (dmInitSystem() != 0) return -1; if (dmInitMonitor() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; dInfo("dnode env is initialized"); return 0; } static int32_t dmCheckRepeatCleanup(SDnode *pDnode) { if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { dError("dnode env is already cleaned up"); return -1; } return 0; } void dmCleanup() { dDebug("start to cleanup dnode env"); SDnode *pDnode = dmInstance(); if (dmCheckRepeatCleanup(pDnode) != 0) return; dmCleanupDnode(pDnode); monCleanup(); syncCleanUp(); walCleanUp(); udfcClose(); udfStopUdfd(); taosStopCacheRefreshWorker(); dInfo("dnode env is cleaned up"); taosCleanupCfg(); taosCloseLog(); } void dmStop() { SDnode *pDnode = dmInstance(); pDnode->stop = true; } int32_t dmRun() { SDnode *pDnode = dmInstance(); return dmRunDnode(pDnode); } static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper != NULL) { dmReleaseWrapper(pWrapper); switch (ntype) { case MNODE: terrno = TSDB_CODE_MNODE_ALREADY_DEPLOYED; break; case QNODE: terrno = TSDB_CODE_QNODE_ALREADY_DEPLOYED; break; case SNODE: terrno = TSDB_CODE_SNODE_ALREADY_DEPLOYED; break; default: terrno = TSDB_CODE_APP_ERROR; } dError("failed to create node since %s", terrstr()); return -1; } dInfo("start to process create-node-request"); pWrapper = &pDnode->wrappers[ntype]; if (taosMkDir(pWrapper->path) != 0) { dmReleaseWrapper(pWrapper); terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); return -1; } taosThreadMutexLock(&pDnode->mutex); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); dInfo("node:%s, start to create", pWrapper->name); int32_t code = (*pWrapper->func.createFp)(&input, pMsg); if (code != 0) { dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); } else { dInfo("node:%s, has been created", pWrapper->name); code = dmOpenNode(pWrapper); if (code == 0) { code = dmStartNode(pWrapper); } pWrapper->deployed = true; pWrapper->required = true; } taosThreadMutexUnlock(&pDnode->mutex); return code; } static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { dError("fail to process alter node type since node not exist"); return -1; } dmReleaseWrapper(pWrapper); dInfo("node:%s, start to process alter-node-type-request", pWrapper->name); pWrapper = &pDnode->wrappers[ntype]; if(pWrapper->func.nodeRoleFp != NULL){ ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt); dInfo("node:%s, checking node role:%d", pWrapper->name, role); if(role == TAOS_SYNC_ROLE_VOTER){ dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role); terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER; return -1; } } if(pWrapper->func.isCatchUpFp != NULL){ dInfo("node:%s, checking node catch up", pWrapper->name); if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){ terrno = TSDB_CODE_MNODE_NOT_CATCH_UP; return -1; } } dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name); taosThreadMutexLock(&pDnode->mutex); dInfo("node:%s, stopping node", pWrapper->name); dmStopNode(pWrapper); dInfo("node:%s, closing node", pWrapper->name); dmCloseNode(pWrapper); pWrapper = &pDnode->wrappers[ntype]; if (taosMkDir(pWrapper->path) != 0) { dmReleaseWrapper(pWrapper); terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); return -1; } SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); dInfo("node:%s, start to create", pWrapper->name); int32_t code = (*pWrapper->func.createFp)(&input, pMsg); if (code != 0) { dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); } else { dInfo("node:%s, has been created", pWrapper->name); code = dmOpenNode(pWrapper); if (code == 0) { code = dmStartNode(pWrapper); } pWrapper->deployed = true; pWrapper->required = true; } taosThreadMutexUnlock(&pDnode->mutex); return code; } static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { switch (ntype) { case MNODE: terrno = TSDB_CODE_MNODE_NOT_DEPLOYED; break; case QNODE: terrno = TSDB_CODE_QNODE_NOT_DEPLOYED; break; case SNODE: terrno = TSDB_CODE_SNODE_NOT_DEPLOYED; break; default: terrno = TSDB_CODE_APP_ERROR; } dError("failed to drop node since %s", terrstr()); return -1; } taosThreadMutexLock(&pDnode->mutex); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); dInfo("node:%s, start to drop", pWrapper->name); int32_t code = (*pWrapper->func.dropFp)(&input, pMsg); if (code != 0) { dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); } else { dInfo("node:%s, has been dropped", pWrapper->name); pWrapper->required = false; pWrapper->deployed = false; } dmReleaseWrapper(pWrapper); if (code == 0) { dmStopNode(pWrapper); dmCloseNode(pWrapper); taosRemoveDir(pWrapper->path); } taosThreadMutexUnlock(&pDnode->mutex); return code; } SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { SMgmtInputOpt opt = { .path = pWrapper->path, .name = pWrapper->name, .pData = &pWrapper->pDnode->data, .processCreateNodeFp = dmProcessCreateNodeReq, .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processDropNodeFp = dmProcessDropNodeReq, .sendMonitorReportFp = dmSendMonitorReport, .getVnodeLoadsFp = dmGetVnodeLoads, .getMnodeLoadsFp = dmGetMnodeLoads, .getQnodeLoadsFp = dmGetQnodeLoads, }; opt.msgCb = dmGetMsgcb(pWrapper->pDnode); return opt; } void dmReportStartup(const char *pName, const char *pDesc) { SStartupInfo *pStartup = &(dmInstance()->startup); tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); dDebug("step:%s, %s", pStartup->name, pStartup->desc); } int64_t dmGetClusterId() { return globalDnode.data.clusterId; }