diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index fb30ef324caf54900fc4e31a13f521453a7d789e..7d5cabccb357fab44422d60f69dd36fe205a3853 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -45,7 +45,7 @@ typedef void (*SendRspFp)(const SRpcMsg* pRsp); typedef void (*SendRedirectRspFp)(const SRpcMsg* pRsp, const SEpSet* pNewEpSet); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); -typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc); +typedef void (*ReportStartup)(const char* name, const char* desc); typedef struct { SMgmtWrapper* pWrapper; diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 10ae1e32ed07766218b4c5ab8f0cc8abf40f3a84..fe1692c50fc181c969b3dcea0dd751868666468d 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -40,6 +40,11 @@ void dmCleanup(); */ int32_t dmRun(); +/** + * @brief Stop dnode. + */ +void dmStop(); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index cbe5268e9f9c1b6b17575a5bdd3f0f4f50b5e81b..43e0b87beb39dcd38871f04bee87d3e1af2863be 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -71,9 +71,5 @@ void tmsgReleaseHandle(void* handle, int8_t type) { void tmsgReportStartup(const char* name, const char* desc) { ReportStartup fp = tsDefaultMsgCb.reportStartupFp; - if (fp != NULL && tsDefaultMsgCb.pWrapper != NULL) { - (*fp)(tsDefaultMsgCb.pWrapper, name, desc); - } else { - terrno = TSDB_CODE_INVALID_PTR; - } + (*fp)(name, desc); } \ No newline at end of file diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 5dc896d0485236be76a9cf45ffb22a5961ada69d..6a03ff1bba5d01f201baeb3d824cbb9384f2825b 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -36,16 +36,10 @@ static struct { char apolloUrl[PATH_MAX]; const char **envCmd; SArray *pArgs; // SConfigPair - SDnode *pDnode; EDndNodeType ntype; } global = {0}; -static void dmStopDnode(int signum, void *info, void *ctx) { - SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode); - if (pDnode != NULL) { - dmSetEvent(pDnode, DND_EVENT_STOP); - } -} +static void dmStopDnode(int signum, void *info, void *ctx) { dmStop(); } static void dmSetSignalHandle() { taosSetSignal(SIGTERM, dmStopDnode); @@ -69,8 +63,8 @@ static void dmSetSignalHandle() { static int32_t dmParseArgs(int32_t argc, char const *argv[]) { int32_t cmdEnvIndex = 0; if (argc < 2) return 0; - global.envCmd = taosMemoryMalloc((argc-1)*sizeof(char*)); - memset(global.envCmd, 0, (argc-1)*sizeof(char*)); + global.envCmd = taosMemoryMalloc((argc - 1) * sizeof(char *)); + memset(global.envCmd, 0, (argc - 1) * sizeof(char *)); for (int32_t i = 1; i < argc; ++i) { if (strcmp(argv[i], "-c") == 0) { if (i < argc - 1) { @@ -102,7 +96,8 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) { } else if (strcmp(argv[i], "-e") == 0) { global.envCmd[cmdEnvIndex] = argv[++i]; cmdEnvIndex++; - } else if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || strcmp(argv[i], "-?")) { + } else if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || + strcmp(argv[i], "-?")) { global.printHelp = true; } else { } @@ -144,23 +139,6 @@ static void dmDumpCfg() { cfgDumpCfg(pCfg, 0, true); } -static SDnodeOpt dmGetOpt() { - SConfig *pCfg = taosGetCfg(); - SDnodeOpt option = {0}; - - option.numOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; - tstrncpy(option.dataDir, tsDataDir, sizeof(option.dataDir)); - tstrncpy(option.firstEp, tsFirst, sizeof(option.firstEp)); - tstrncpy(option.secondEp, tsSecond, sizeof(option.firstEp)); - option.serverPort = tsServerPort; - tstrncpy(option.localFqdn, tsLocalFqdn, sizeof(option.localFqdn)); - snprintf(option.localEp, sizeof(option.localEp), "%s:%u", option.localFqdn, option.serverPort); - option.disks = tsDiskCfg; - option.numOfDisks = tsDiskCfgNum; - option.ntype = global.ntype; - return option; -} - static int32_t dmInitLog() { char logName[12] = {0}; snprintf(logName, sizeof(logName), "%slog", dmNodeLogName(global.ntype)); @@ -175,34 +153,6 @@ static void dmSetProcInfo(int32_t argc, char **argv) { } } -static int32_t dmRunDnode() { - if (dmInit() != 0) { - dError("failed to init environment since %s", terrstr()); - return -1; - } - - SDnodeOpt option = dmGetOpt(); - SDnode *pDnode = dmCreate(&option); - if (pDnode == NULL) { - dError("failed to to create dnode since %s", terrstr()); - return -1; - } else { - global.pDnode = pDnode; - dmSetSignalHandle(); - } - - dInfo("start to run dnode"); - int32_t code = dmRun(pDnode); - dInfo("shutting down the service"); - - global.pDnode = NULL; - dmClose(pDnode); - dmCleanup(); - taosCloseLog(); - taosCleanupCfg(); - return code; -} - static void taosCleanupArgs() { if (global.envCmd != NULL) taosMemoryFree(global.envCmd); } @@ -259,5 +209,17 @@ int main(int argc, char const *argv[]) { dmSetProcInfo(argc, (char **)argv); taosCleanupArgs(); - return dmRunDnode(); + + if (dmInit(global.ntype) != 0) { + dError("failed to init dnode since %s", terrstr()); + return -1; + } + + dInfo("start to run dnode"); + dmSetSignalHandle(); + int32_t code = dmRun(); + dInfo("shutting down the service"); + + dmCleanup(); + return code; } diff --git a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h index 847df28f78a45b8019b9c26c1a98665762ad6f6e..e7738ff43d7c38df3e6112ce7859014cdcd8e4f7 100644 --- a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h +++ b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h @@ -25,11 +25,11 @@ extern "C" { #endif typedef struct SBnodeMgmt { + SDnodeData *pData; SBnode *pBnode; SMsgCb msgCb; const char *path; const char *name; - int32_t dnodeId; SMultiWorker writeWorker; SSingleWorker monitorWorker; } SBnodeMgmt; diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index 407d698faae7a9f233478bb0b1c5f15d2b37fa43..5cee69fa94e17706e63175a8164b651641ac0548 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -76,7 +76,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop bnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c index 34408eb61713ec9dab8867ede9b8af9ed4b0517b..1fd3aab1d9a19d61accd810e1276f54fa5b0f57d 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c @@ -39,9 +39,9 @@ int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } + pMgmt->pData = pInput->pData; pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.pMgmt = pMgmt; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 1dd78c5a409831b268227da0443405139f84694a..f90fd72c6b5a836f9bb38b185cea058eb71ff3bb 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -23,7 +23,6 @@ extern "C" { #endif typedef struct SDnodeMgmt { - struct SDnode *pDnode; SDnodeData *pData; SMsgCb msgCb; const char *path; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index db72e9123b2fb90215ca6012508fe3f10ea6be5c..7afa86a3776e4ebfb29ab151d5842d9ffac1e735 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -59,8 +59,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.rebootTime = pMgmt->pData->rebootTime; req.updateTime = pMgmt->pData->updateTime; req.numOfCores = tsNumOfCores; - req.numOfSupportVnodes = pMgmt->pData->supportVnodes; - tstrncpy(req.dnodeEp, pMgmt->pData->localEp, TSDB_EP_LEN); + req.numOfSupportVnodes = tsNumOfSupportVnodes; + tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN); req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.checkTime = 0; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 5f8e015f1463354bf78b31d1b732f0ba45c2306a..3b343d491642229da7b0e6db63301476853856e6 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -39,7 +39,6 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } - pMgmt->pDnode = pInput->pDnode; pMgmt->pData = pInput->pData; pMgmt->msgCb = pInput->msgCb; pMgmt->path = pInput->path; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c b/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c index 801ec89ac2fbfa79de866e6d32a5a35089427d7a..3547c769377733ac9b60a5e3859f395e469505f0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c @@ -16,19 +16,18 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \ - if (!tsMultiProcess) { \ - SRpcMsg rsp = {0}; \ - SRpcMsg req = {.msgType = mtype}; \ - SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ - tstrncpy(epset.eps[0].fqdn, pMgmt->pData->localFqdn, TSDB_FQDN_LEN); \ - epset.eps[0].port = pMgmt->pData->serverPort; \ - \ - rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \ - if (rsp.code == 0 && rsp.contLen > 0) { \ - func(rsp.pCont, rsp.contLen, pInfo); \ - } \ - rpcFreeCont(rsp.pCont); \ +#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \ + if (!tsMultiProcess) { \ + SRpcMsg rsp = {0}; \ + SRpcMsg req = {.msgType = mtype}; \ + SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ + tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \ + epset.eps[0].port = tsServerPort; \ + rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \ + if (rsp.code == 0 && rsp.contLen > 0) { \ + func(rsp.pCont, rsp.contLen, pInfo); \ + } \ + rpcFreeCont(rsp.pCont); \ } static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) { @@ -40,10 +39,10 @@ static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) { static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) { pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f); - pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, MNODE); - pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, QNODE); - pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, SNODE); - pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, BNODE); + pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(MNODE); + pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(QNODE); + pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(SNODE); + pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(BNODE); tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); pInfo->logdir.size = tsLogSpace.size; tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 3316c4ebf4a3d770f070970f1ab47212d640fd88..7daf25bb8ad871b62c4ebe09d788356967d6ae4c 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -116,28 +116,28 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { code = dmProcessGrantRsp(pMgmt, pMsg); break; case TDMT_DND_CREATE_MNODE: - code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, MNODE, pMsg); + code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg); break; case TDMT_DND_DROP_MNODE: - code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, MNODE, pMsg); + code = (*pMgmt->processDropNodeFp)(MNODE, pMsg); break; case TDMT_DND_CREATE_QNODE: - code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, QNODE, pMsg); + code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg); break; case TDMT_DND_DROP_QNODE: - code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, QNODE, pMsg); + code = (*pMgmt->processDropNodeFp)(QNODE, pMsg); break; case TDMT_DND_CREATE_SNODE: - code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, SNODE, pMsg); + code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg); break; case TDMT_DND_DROP_SNODE: - code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, SNODE, pMsg); + code = (*pMgmt->processDropNodeFp)(SNODE, pMsg); break; case TDMT_DND_CREATE_BNODE: - code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, BNODE, pMsg); + code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg); break; case TDMT_DND_DROP_BNODE: - code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, BNODE, pMsg); + code = (*pMgmt->processDropNodeFp)(BNODE, pMsg); break; case TDMT_DND_SERVER_STATUS: code = dmProcessServerRunStatus(pMgmt, pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 2a8c12a9098a1ecad0792b10d477f19ad3725d3e..23e1458f61e1723630165128d8e2ecd81c5c6b7a 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -24,11 +24,11 @@ extern "C" { #endif typedef struct SMnodeMgmt { + SDnodeData *pData; SMnode *pMnode; SMsgCb msgCb; const char *path; const char *name; - int32_t dnodeId; SSingleWorker queryWorker; SSingleWorker readWorker; SSingleWorker writeWorker; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 44a54c27403fd712c98e1966465304360927a6f2..35e382da196052669df560e9f665e0e5b03c5a93 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -109,7 +109,7 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop mnode since %s", terrstr()); return -1; @@ -133,9 +133,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if (pMgmt->dnodeId != 0 && alterReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->pData->dnodeId != 0 && alterReq.dnodeId != pMgmt->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->dnodeId); + dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->pData->dnodeId); return -1; } else { return mmAlter(pMgmt, &alterReq); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index b6f52118923ee2fb8065cb61de2050e9a4a69a63..8445889954859d4b71d6fb5859bc8751ea3c98d1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -70,7 +70,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre pReplica->id = pCreate->replicas[i].id; pReplica->port = pCreate->replicas[i].port; memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pMgmt->dnodeId) { + if (pReplica->id == pMgmt->pData->dnodeId) { pOption->selfIndex = i; } } @@ -128,9 +128,9 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } + pMgmt->pData = pInput->pData; pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; @@ -148,7 +148,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { SMnodeOpt option = {0}; if (!deployed) { dInfo("mnode start to deploy"); - pMgmt->dnodeId = 1; + pMgmt->pData->dnodeId = 1; mmBuildOptionForDeploy(pMgmt, pInput, &option); } else { dInfo("mnode start to open"); @@ -178,7 +178,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } } - pInput->pData->dnodeId = pMgmt->dnodeId; + pInput->pData->dnodeId = pMgmt->pData->dnodeId; pOutput->pMgmt = pMgmt; return 0; } diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index 66950030206300dd77a82fdeb91fa7c5229b5aa6..6221438ebaddfc5184cbdcaace57a70a666b9a74 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -25,11 +25,11 @@ extern "C" { #endif typedef struct SQnodeMgmt { + SDnodeData *pData; SQnode *pQnode; SMsgCb msgCb; const char *path; const char *name; - int32_t dnodeId; SSingleWorker queryWorker; SSingleWorker fetchWorker; SSingleWorker monitorWorker; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 25993e2d5b37b42f7c93996b6ed0ae041027271a..ec4cc39c82826206147e609f7a4352bcca4b174a 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -76,7 +76,7 @@ int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop qnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index 93cf1523573b7f1ddda1757880dee7b6ab5db0ee..a40f95041bc0e961c514dbf088526146a69266ce 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -39,9 +39,9 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } + pMgmt->pData = pInput->pData; pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue; diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 5d112f51a44420aa6d041ddf14b300234db35855..c9aa8364542a5165b2ca1c5812ab5747fc5a9ece 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -25,11 +25,11 @@ extern "C" { #endif typedef struct SSnodeMgmt { + SDnodeData *pData; SSnode *pSnode; SMsgCb msgCb; const char *path; const char *name; - int32_t dnodeId; SRWLatch latch; int8_t uniqueWorkerInUse; SArray *uniqueWorkers; // SArray diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 76f25af994d791bff1e13ca1bfb9a9f8172989d9..99c68341ce4fab45eea4751c2c2141342351dd2c 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -76,7 +76,7 @@ int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { + if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop snode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 80eb0e91ec31628c9e9cf70458ed95a16eaf1cd4..25d632d565aaca7153dd6e35c7cc07f8d4bc7ce0 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -40,9 +40,9 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } + pMgmt->pData = pInput->pData; pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.pMgmt = pMgmt; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index db72781be1fc6a2bcb79fc02f7fd3b2889ad361b..7fc10c4237cd2af28cc649644b39d9d134361c49 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,10 +26,10 @@ extern "C" { #endif typedef struct SVnodeMgmt { + SDnodeData *pData; SMsgCb msgCb; const char *path; const char *name; - int32_t dnodeId; SQWorkerPool queryPool; SQWorkerPool fetchPool; SWWorkerPool syncPool; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 28f6f5f60f28248fbd1f3609644644b11a6ea5ad..287d49c4f5eeb0c7edbfe9f70be862bfe8ee4644 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -247,9 +247,9 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt)); if (pMgmt == NULL) goto _OVER; + pMgmt->pData = pInput->pData; pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index a36324ec5f261d99ebad5b37baf10cd46d5bab6e..51b518653fb7d55b32e12adef789e2da6d177326 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -65,29 +65,29 @@ typedef struct { } SProc; typedef struct SMgmtWrapper { - SDnode *pDnode; - SMgmtFunc func; - void *pMgmt; - const char *name; - char *path; - int32_t refCount; - SRWLatch latch; - EDndNodeType ntype; - bool deployed; - bool required; - SProc proc; - NodeMsgFp msgFps[TDMT_MAX]; + struct SDnode *pDnode; + SMgmtFunc func; + void *pMgmt; + const char *name; + char *path; + int32_t refCount; + SRWLatch latch; + EDndNodeType ntype; + bool deployed; + bool required; + SProc proc; + NodeMsgFp msgFps[TDMT_MAX]; } SMgmtWrapper; typedef struct { EDndNodeType defaultNtype; bool needCheckVgId; -} SMsgHandle; +} SDnodeHandle; typedef struct { - void *serverRpc; - void *clientRpc; - SMsgHandle msgHandles[TDMT_MAX]; + void *serverRpc; + void *clientRpc; + SDnodeHandle msgHandles[TDMT_MAX]; } SDnodeTrans; typedef struct { @@ -110,9 +110,10 @@ typedef struct SUdfdData { } SUdfdData; typedef struct SDnode { + int8_t once; + bool stop; EDndProcType ptype; EDndNodeType rtype; - EDndEvent event; EDndRunStatus status; SStartupInfo startup; SDnodeTrans trans; @@ -123,23 +124,26 @@ typedef struct SDnode { SMgmtWrapper wrappers[NODE_END]; } SDnode; -// dmExec.c -int32_t dmOpenNode(SMgmtWrapper *pWrapper); -void dmCloseNode(SMgmtWrapper *pWrapper); +// dmEmv.c +void dmReportStartup(const char *pName, const char *pDesc); -// dmObj.c +// dmMgmt.c +int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype); +void dmCleanupDnode(SDnode *pDnode); SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType); int32_t dmMarkWrapper(SMgmtWrapper *pWrapper); void dmReleaseWrapper(SMgmtWrapper *pWrapper); SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); -void dmSetEvent(SDnode *pDnode, EDndEvent event); -void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); -void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc); void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); +// dmNodes.c +int32_t dmOpenNode(SMgmtWrapper *pWrapper); +void dmCloseNode(SMgmtWrapper *pWrapper); +int32_t dmRunDnode(SDnode *pDnode); + // dmProc.c int32_t dmInitProc(struct SMgmtWrapper *pWrapper); void dmCleanupProc(struct SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index c6b7a5bd9859f7fe44793b23b38c0cf4f686fd8b..d5073409507fda741ec22574149d406bb926424a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -16,23 +16,10 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static struct { - int8_t once; - EDndProcType ptype; - EDndNodeType rtype; - EDndEvent event; - EDndRunStatus status; - SStartupInfo startup; - SDnodeTrans trans; - SUdfdData udfdData; - TdThreadMutex mutex; - TdFilePtr lockfile; - SDnodeData data; - SMgmtWrapper wrappers[NODE_END]; -} global; - -static int32_t dmCheckRepeatInit() { - if (atomic_val_compare_exchange_8(&global.once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { +static SDnode global = {0}; + +static int32_t dmCheckRepeatInit(SDnode *pDnode) { + if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { dError("env is already initialized"); terrno = TSDB_CODE_REPEAT_INIT; return -1; @@ -60,19 +47,19 @@ static int32_t dmInitMonitor() { return 0; } -int32_t dmInit() { +int32_t dmInit(int8_t rtype) { dInfo("start to init env"); - if (dmCheckRepeatInit() != 0) return -1; + if (dmCheckRepeatInit(&global) != 0) return -1; if (dmInitSystem() != 0) return -1; if (dmInitMonitor() != 0) return -1; - // if (dmInit) + if (dmInitDnode(&global, rtype) != 0) return -1; dInfo("env is initialized"); return 0; } -static int32_t dmCheckRepeatCleanup() { - if (atomic_val_compare_exchange_8(&global.once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { +static int32_t dmCheckRepeatCleanup(SDnode *pDnode) { + if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { dError("env is already cleaned up"); return -1; } @@ -82,7 +69,7 @@ static int32_t dmCheckRepeatCleanup() { void dmCleanup() { dDebug("start to cleanup env"); if (dmCheckRepeatCleanup != 0) return; - + dmCleanupDnode(&global); monCleanup(); syncCleanUp(); walCleanUp(); @@ -90,4 +77,111 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dInfo("env is cleaned up"); + + taosCloseLog(); + taosCleanupCfg(); +} + +void dmStop() { + SDnode *pDnode = &global; + pDnode->stop = true; +} + +int32_t dmRun() { + SDnode *pDnode = &global; + return dmRunDnode(pDnode); +} + +static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { + SDnode *pDnode = &global; + + SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); + if (pWrapper != NULL) { + dmReleaseWrapper(pWrapper); + terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; + dError("failed to create node since %s", terrstr()); + return -1; + } + + taosThreadMutexLock(&pDnode->mutex); + pWrapper = &pDnode->wrappers[ntype]; + + if (taosMkDir(pWrapper->path) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); + return -1; + } + + SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); + + int32_t code = (*pWrapper->func.createFp)(&input, pMsg); + if (code != 0) { + dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); + } else { + dDebug("node:%s, has been created", pWrapper->name); + (void)dmOpenNode(pWrapper); + pWrapper->required = true; + pWrapper->deployed = true; + pWrapper->proc.ptype = pDnode->ptype; + } + + taosThreadMutexUnlock(&pDnode->mutex); + return code; +} + +static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { + SDnode *pDnode = &global; + + SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); + if (pWrapper == NULL) { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + dError("failed to drop node since %s", terrstr()); + return -1; + } + + taosThreadMutexLock(&pDnode->mutex); + + int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg); + if (code != 0) { + dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); + } else { + dDebug("node:%s, has been dropped", pWrapper->name); + pWrapper->required = false; + pWrapper->deployed = false; + } + + dmReleaseWrapper(pWrapper); + + if (code == 0) { + dmCloseNode(pWrapper); + taosRemoveDir(pWrapper->path); + } + taosThreadMutexUnlock(&pDnode->mutex); + return code; +} + +static bool dmIsNodeRequired(EDndNodeType ntype) { + SDnode *pDnode = &global; + return pDnode->wrappers[ntype].required; +} + +SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { + SMgmtInputOpt opt = { + .path = pWrapper->path, + .name = pWrapper->name, + .pData = &pWrapper->pDnode->data, + .processCreateNodeFp = dmProcessCreateNodeReq, + .processDropNodeFp = dmProcessDropNodeReq, + .isNodeRequiredFp = dmIsNodeRequired, + }; + + opt.msgCb = dmGetMsgcb(pWrapper); + return opt; +} + +void dmReportStartup(const char *pName, const char *pDesc) { + SStartupInfo *pStartup = &global.startup; + tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); + tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); + dDebug("step:%s, %s", pStartup->name, pStartup->desc); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index e53ff3255505c9b66e9d9c93788c6fb0bf6a09e8..b8cb147ac4640ca3eab5a6740205bffc8886067d 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -16,8 +16,6 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static bool dmIsNodeRequired(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; } - static bool dmRequireNode(SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); @@ -39,8 +37,8 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) { return required; } -static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { - pDnode->rtype = pOption->ntype; +static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) { + pDnode->rtype = rtype; if (tsMultiProcess == 0) { pDnode->ptype = DND_PROC_SINGLE; @@ -65,21 +63,6 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { pData->rebootTime = taosGetTimestampMs(); pData->dropped = 0; pData->stopped = 0; - pData->localEp = strdup(pOption->localEp); - pData->localFqdn = strdup(pOption->localFqdn); - pData->firstEp = strdup(pOption->firstEp); - pData->secondEp = strdup(pOption->secondEp); - pData->supportVnodes = pOption->numOfSupportVnodes; - pData->serverPort = pOption->serverPort; - pData->numOfDisks = pOption->numOfDisks; - pData->disks = pOption->disks; - pData->dataDir = strdup(pOption->dataDir); - - if (pData->dataDir == NULL || pData->localEp == NULL || pData->localFqdn == NULL || pData->firstEp == NULL || - pData->secondEp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pData->dnodeHash == NULL) { @@ -126,30 +109,17 @@ static void dmClearVars(SDnode *pDnode) { } taosWUnLockLatch(&pData->latch); - taosMemoryFreeClear(pData->localEp); - taosMemoryFreeClear(pData->localFqdn); - taosMemoryFreeClear(pData->firstEp); - taosMemoryFreeClear(pData->secondEp); - taosMemoryFreeClear(pData->dataDir); - taosThreadMutexDestroy(&pDnode->mutex); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); taosMemoryFree(pDnode); } -SDnode *dmCreate(const SDnodeOpt *pOption) { +int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { dInfo("start to create dnode"); int32_t code = -1; char path[PATH_MAX + 100] = {0}; - SDnode *pDnode = NULL; - - pDnode = taosMemoryCalloc(1, sizeof(SDnode)); - if (pDnode == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - if (dmInitVars(pDnode, pOption) != 0) { + if (dmInitVars(pDnode, rtype) != 0) { goto _OVER; } @@ -162,7 +132,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - pWrapper->pDnode = pDnode; pWrapper->name = dmNodeName(ntype); pWrapper->ntype = ntype; pWrapper->proc.wrapper = pWrapper; @@ -174,7 +143,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { } taosInitRWLatch(&pWrapper->latch); - snprintf(path, sizeof(path), "%s%s%s", pOption->dataDir, TD_DIRSEP, pWrapper->name); + snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); if (pWrapper->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -195,7 +164,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { } if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) { - pDnode->lockfile = dmCheckRunning(pOption->dataDir); + pDnode->lockfile = dmCheckRunning(tsDataDir); if (pDnode->lockfile == NULL) { goto _OVER; } @@ -210,7 +179,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { goto _OVER; } - dmReportStartup(pDnode, "dnode-transport", "initialized"); + dmReportStartup("dnode-transport", "initialized"); dInfo("dnode is created, ptr:%p", pDnode); code = 0; @@ -221,10 +190,10 @@ _OVER: dError("failed to create dnode since %s", terrstr()); } - return pDnode; + return code; } -void dmClose(SDnode *pDnode) { +void dmCleanupDnode(SDnode *pDnode) { if (pDnode == NULL) return; dmCleanupClient(pDnode); @@ -240,12 +209,6 @@ void dmSetStatus(SDnode *pDnode, EDndRunStatus status) { } } -void dmSetEvent(SDnode *pDnode, EDndEvent event) { - if (event == DND_EVENT_STOP) { - pDnode->event = event; - } -} - SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pRetWrapper = pWrapper; @@ -288,17 +251,6 @@ void dmReleaseWrapper(SMgmtWrapper *pWrapper) { dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount); } -void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { - SStartupInfo *pStartup = &pDnode->startup; - tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); - tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); - dDebug("step:%s, %s", pStartup->name, pStartup->desc); -} - -void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) { - dmReportStartup(pWrapper->pDnode, pName, pDesc); -} - static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt; pStatus->details[0] = 0; @@ -315,7 +267,7 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("net test req is received"); - SRpcMsg rsp = {.info = pReq->info, .code = 0}; + SRpcMsg rsp = {.code = 0, .info = pReq->info}; rsp.pCont = rpcMallocCont(pReq->contLen); if (rsp.pCont == NULL) { rsp.code = TSDB_CODE_OUT_OF_MEMORY; @@ -353,82 +305,3 @@ _OVER: rpcSendResponse(&rspMsg); rpcFreeCont(pReq->pCont); } - -static int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SRpcMsg *pMsg) { - SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); - if (pWrapper != NULL) { - dmReleaseWrapper(pWrapper); - terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; - dError("failed to create node since %s", terrstr()); - return -1; - } - - taosThreadMutexLock(&pDnode->mutex); - pWrapper = &pDnode->wrappers[ntype]; - - if (taosMkDir(pWrapper->path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); - return -1; - } - - SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); - - int32_t code = (*pWrapper->func.createFp)(&input, pMsg); - if (code != 0) { - dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); - } else { - dDebug("node:%s, has been created", pWrapper->name); - (void)dmOpenNode(pWrapper); - pWrapper->required = true; - pWrapper->deployed = true; - pWrapper->proc.ptype = pDnode->ptype; - } - - taosThreadMutexUnlock(&pDnode->mutex); - return code; -} - -static int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SRpcMsg *pMsg) { - SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); - if (pWrapper == NULL) { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - dError("failed to drop node since %s", terrstr()); - return -1; - } - - taosThreadMutexLock(&pDnode->mutex); - - int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg); - if (code != 0) { - dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); - } else { - dDebug("node:%s, has been dropped", pWrapper->name); - pWrapper->required = false; - pWrapper->deployed = false; - } - - dmReleaseWrapper(pWrapper); - - if (code == 0) { - dmCloseNode(pWrapper); - taosRemoveDir(pWrapper->path); - } - taosThreadMutexUnlock(&pDnode->mutex); - return code; -} - -SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { - SMgmtInputOpt opt = { - .pDnode = pWrapper->pDnode, - .pData = &pWrapper->pDnode->data, - .processCreateNodeFp = dmProcessCreateNodeReq, - .processDropNodeFp = dmProcessDropNodeReq, - .isNodeRequiredFp = dmIsNodeRequired, - .name = pWrapper->name, - .path = pWrapper->path, - }; - - opt.msgCb = dmGetMsgcb(pWrapper); - return opt; -} \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index 43748fb688aa00abd873a2fc0fd67620c55b0d5d..f6d7916f95511c69f7fa61c773c3378ad1f1574a 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -136,7 +136,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { pWrapper->pMgmt = output.pMgmt; } - dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned"); + dmReportStartup(pWrapper->name, "openned"); return 0; } @@ -151,7 +151,7 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) { dDebug("node:%s, has been started", pWrapper->name); } - dmReportStartup(pWrapper->pDnode, pWrapper->name, "started"); + dmReportStartup(pWrapper->name, "started"); return 0; } @@ -221,7 +221,7 @@ static int32_t dmStartNodes(SDnode *pDnode) { } dInfo("TDengine initialized successfully"); - dmReportStartup(pDnode, "TDengine", "initialized successfully"); + dmReportStartup("TDengine", "initialized successfully"); return 0; } @@ -260,7 +260,7 @@ static void dmWatchNodes(SDnode *pDnode) { taosThreadMutexUnlock(&pDnode->mutex); } -int32_t dmRun(SDnode *pDnode) { +int32_t dnRunDnode(SDnode *pDnode) { if (dmOpenNodes(pDnode) != 0) { dError("failed to open nodes since %s", terrstr()); return -1; @@ -272,7 +272,7 @@ int32_t dmRun(SDnode *pDnode) { } while (1) { - if (pDnode->event & DND_EVENT_STOP) { + if (!pDnode->stop) { dInfo("dnode is about to stop"); dmSetStatus(pDnode, DND_STAT_STOPPED); dmStopNodes(pDnode); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a76d4eac81a27576c88c72a126c829ed5b26ed47..4a32bc710aa3b3c6d62eadfa1aa263b1fe07e0f0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -52,7 +52,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t code = -1; SRpcMsg *pMsg = NULL; bool needRelease = false; - SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; + SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; SMgmtWrapper *pWrapper = NULL; dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), @@ -178,7 +178,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { SMgmtHandle *pMgmt = taosArrayGet(pArray, i); - SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; + SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; } @@ -201,7 +201,7 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) { + if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { epSet.inUse = (i + 1) % epSet.numOfEps; } @@ -410,8 +410,8 @@ int32_t dmInitServer(SDnode *pDnode) { SRpcInit rpcInit = {0}; - strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn)); - rpcInit.localPort = pDnode->data.serverPort; + strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn)); + rpcInit.localPort = tsServerPort; rpcInit.label = "DND"; rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; @@ -449,7 +449,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { .sendRedirectRspFp = dmSendRedirectRsp, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .releaseHandleFp = dmReleaseHandle, - .reportStartupFp = dmReportStartupByWrapper, + .reportStartupFp = dmReportStartup, }; return msgCb; } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 22e6fcf34ce7bc48aa37ad5b616a94d549b8be00..4b353cb6cdf97bb60e368954eaebbe5271d62290 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -92,18 +92,18 @@ typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef bool (*IsNodeRequiredFp)(EDndNodeType ntype); typedef struct { - int32_t dnodeId; - int64_t clusterId; - int64_t dnodeVer; - int64_t updateTime; - int64_t rebootTime; - bool dropped; - bool stopped; - SEpSet mnodeEps; - SArray *dnodeEps; - SHashObj *dnodeHash; - SRWLatch latch; - SMsgCb msgCb; + int32_t dnodeId; + int64_t clusterId; + int64_t dnodeVer; + int64_t updateTime; + int64_t rebootTime; + bool dropped; + bool stopped; + SEpSet mnodeEps; + SArray *dnodeEps; + SHashObj *dnodeHash; + SRWLatch latch; + SMsgCb msgCb; } SDnodeData; typedef struct {