diff --git a/source/dnode/mgmt/CMakeLists.txt b/source/dnode/mgmt/CMakeLists.txt index 1872e91a7e7114586723f2b889fcfb811f1819ed..3293d1967aebbf8661cbaf9f01958f0bf6250f79 100644 --- a/source/dnode/mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/CMakeLists.txt @@ -5,6 +5,7 @@ aux_source_directory(bnode/src DNODE_SRC) aux_source_directory(snode/src DNODE_SRC) aux_source_directory(vnode/src DNODE_SRC) aux_source_directory(mnode/src DNODE_SRC) +aux_source_directory(container/src DNODE_SRC) add_library(dnode STATIC ${DNODE_SRC}) target_link_libraries( @@ -19,6 +20,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/snode/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/vnode/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/mnode/inc" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/container/inc" ) add_subdirectory(exec) diff --git a/source/dnode/mgmt/bnode/src/bmMgmt.c b/source/dnode/mgmt/bnode/src/bmMgmt.c index d1ca08ff1e76df9cfd775d5df00a199d76253e55..935543bf0c02939eeccaccef9305bd51c969b00e 100644 --- a/source/dnode/mgmt/bnode/src/bmMgmt.c +++ b/source/dnode/mgmt/bnode/src/bmMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE // #include "dndBnode.h" -// #include "dndMgmt.h" +// #include "dmMgmt.h" // #include "dndTransport.h" // #include "dndWorker.h" diff --git a/source/dnode/mgmt/dnode/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h similarity index 98% rename from source/dnode/mgmt/dnode/inc/dndInt.h rename to source/dnode/mgmt/container/inc/dndInt.h index dff88e75a0a92f63254f4d86ca59e80635b05b26..5193c8bb3b1a94af5f731b3f1be73cc5971f7352 100644 --- a/source/dnode/mgmt/dnode/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -69,8 +69,7 @@ typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg); - -typedef SMgmtWrapper *(*OpenNodeFp)(SDnode *pDnode, const char *path); +typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SDnode *pDnode, SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); typedef int32_t (*MgmtHandleMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg); @@ -254,7 +253,7 @@ SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) ; void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); -SMgmtFp dndGetMgmtFp(); +SMgmtFp dmGetMgmtFp(); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/inc/dndMain.h b/source/dnode/mgmt/container/inc/dndMain.h similarity index 100% rename from source/dnode/mgmt/dnode/inc/dndMain.h rename to source/dnode/mgmt/container/inc/dndMain.h diff --git a/source/dnode/mgmt/dnode/inc/dndMonitor.h b/source/dnode/mgmt/container/inc/dndMonitor.h similarity index 100% rename from source/dnode/mgmt/dnode/inc/dndMonitor.h rename to source/dnode/mgmt/container/inc/dndMonitor.h diff --git a/source/dnode/mgmt/dnode/inc/dndTransport.h b/source/dnode/mgmt/container/inc/dndTransport.h similarity index 96% rename from source/dnode/mgmt/dnode/inc/dndTransport.h rename to source/dnode/mgmt/container/inc/dndTransport.h index aa58f6c3d42e5511a8a0c258b4a3758c3bf109eb..1a925ee92349f8856a0afcad3cd7d9e40972360b 100644 --- a/source/dnode/mgmt/dnode/inc/dndTransport.h +++ b/source/dnode/mgmt/container/inc/dndTransport.h @@ -24,6 +24,7 @@ extern "C" { int32_t dndInitTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode); +int32_t dndInitServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/dnode/inc/dndWorker.h b/source/dnode/mgmt/container/inc/dndWorker.h similarity index 100% rename from source/dnode/mgmt/dnode/inc/dndWorker.h rename to source/dnode/mgmt/container/inc/dndWorker.h diff --git a/source/dnode/mgmt/dnode/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c similarity index 99% rename from source/dnode/mgmt/dnode/src/dndInt.c rename to source/dnode/mgmt/container/src/dndInt.c index b9276beb6e4c6e9819a55b48df1271d0c9a3c8b9..b68f143c792d3113629d1a33b46b5d1b5800ed90 100644 --- a/source/dnode/mgmt/dnode/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "dndInt.h" -#include "dndHandle.h" +#include "dmHandle.h" #include "dndTransport.h" #include "vmInt.h" diff --git a/source/dnode/mgmt/dnode/src/dndMain.c b/source/dnode/mgmt/container/src/dndMain.c similarity index 88% rename from source/dnode/mgmt/dnode/src/dndMain.c rename to source/dnode/mgmt/container/src/dndMain.c index 2fcd00f735d73a2fa538ad1d5cb5f394bb30eeb4..ada39e6d4a840cd70b380a1e9f76443bc70d31b9 100644 --- a/source/dnode/mgmt/dnode/src/dndMain.c +++ b/source/dnode/mgmt/container/src/dndMain.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "dndMain.h" -#include "dndMgmt.h" +#include "dmMgmt.h" #include "dndTransport.h" #include "bmInt.h" @@ -57,36 +57,9 @@ static void dndClearMemory(SDnode *pDnode) { } static int32_t dndInitResource(SDnode *pDnode) { - SDiskCfg dCfg = {0}; - tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN); - dCfg.level = 0; - dCfg.primary = 1; - SDiskCfg *pDisks = pDnode->cfg.pDisks; - int32_t numOfDisks = pDnode->cfg.numOfDisks; - if (numOfDisks <= 0 || pDisks == NULL) { - pDisks = &dCfg; - numOfDisks = 1; - } - pDnode->pTfs = tfsOpen(pDisks, numOfDisks); - if (pDnode->pTfs == NULL) { - dError("failed to init tfs since %s", terrstr()); - return -1; - } - if (dndInitMgmt(pDnode) != 0) { - dError("failed to init mgmt since %s", terrstr()); - return -1; - } - - if (dndInitTrans(pDnode) != 0) { - dError("failed to init transport since %s", terrstr()); - return -1; - } - dndSetStatus(pDnode, DND_STAT_RUNNING); - dndSendStatusReq(pDnode); - dndReportStartup(pDnode, "TDengine", "initialized successfully"); return 0; } @@ -120,7 +93,7 @@ SDnode *dndCreate(SDndCfg *pCfg) { goto _OVER; } - pDnode->wrappers[DNODE].fp = dndGetMgmtFp(); + pDnode->wrappers[DNODE].fp = dmGetMgmtFp(); pDnode->wrappers[MNODE].fp = mmGetMgmtFp(); pDnode->wrappers[VNODES].fp = vmGetMgmtFp(); pDnode->wrappers[QNODE].fp = qmGetMgmtFp(); @@ -216,7 +189,7 @@ static int32_t dndOpenNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { // return 0; - pWrapper->pMgmt = (*pWrapper->fp.openFp)(pDnode, pWrapper->path); + (*pWrapper->fp.openFp)(pWrapper); return 0; } diff --git a/source/dnode/mgmt/dnode/src/dndMonitor.c b/source/dnode/mgmt/container/src/dndMonitor.c similarity index 99% rename from source/dnode/mgmt/dnode/src/dndMonitor.c rename to source/dnode/mgmt/container/src/dndMonitor.c index 53ae4bc3618f771c71d73a5839a7837f9cc9f0ce..41665df6f63a3ed06f5d0f50c419b18a4f234e2b 100644 --- a/source/dnode/mgmt/dnode/src/dndMonitor.c +++ b/source/dnode/mgmt/container/src/dndMonitor.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "dndMonitor.h" -#include "dndMgmt.h" +#include "dmMgmt.h" static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); diff --git a/source/dnode/mgmt/dnode/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c similarity index 99% rename from source/dnode/mgmt/dnode/src/dndTransport.c rename to source/dnode/mgmt/container/src/dndTransport.c index 94aad909f29b8df549d883c297cd2725c18a3b16..87efcd8c2730fd7ac15c9e51623c3de98d1eb223 100644 --- a/source/dnode/mgmt/dnode/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "dndTransport.h" -#include "dndMgmt.h" +#include "dmMgmt.h" #include "mmInt.h" #define INTERNAL_USER "_dnd" @@ -207,7 +207,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char return rpcRsp.code; } -static int32_t dndInitServer(SDnode *pDnode) { + int32_t dndInitServer(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); diff --git a/source/dnode/mgmt/dnode/src/dndWorker.c b/source/dnode/mgmt/container/src/dndWorker.c similarity index 100% rename from source/dnode/mgmt/dnode/src/dndWorker.c rename to source/dnode/mgmt/container/src/dndWorker.c diff --git a/source/dnode/mgmt/dnode/inc/dndFile.h b/source/dnode/mgmt/dnode/inc/dmFile.h similarity index 90% rename from source/dnode/mgmt/dnode/inc/dndFile.h rename to source/dnode/mgmt/dnode/inc/dmFile.h index 07049d04b9aa04cca9f85f95eae198f831d27933..64cd00f9ff40d45dfb0bb7b4faf1e2e3f568408b 100644 --- a/source/dnode/mgmt/dnode/inc/dndFile.h +++ b/source/dnode/mgmt/dnode/inc/dmFile.h @@ -22,8 +22,8 @@ extern "C" { #endif -int32_t dndReadFile(SDnode *pDnode); -int32_t dndWriteFile(SDnode *pDnode); +int32_t dmReadFile(SDnode *pDnode); +int32_t dmWriteFile(SDnode *pDnode); void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps); void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps); diff --git a/source/dnode/mgmt/dnode/inc/dndHandle.h b/source/dnode/mgmt/dnode/inc/dmHandle.h similarity index 80% rename from source/dnode/mgmt/dnode/inc/dndHandle.h rename to source/dnode/mgmt/dnode/inc/dmHandle.h index 277c4f9f22cffc5eddb14db24dae917e71e44d65..e9ae27ba7390dbdb62419a3a575e09a0e6272132 100644 --- a/source/dnode/mgmt/dnode/inc/dndHandle.h +++ b/source/dnode/mgmt/dnode/inc/dmHandle.h @@ -23,7 +23,11 @@ extern "C" { #endif void dndInitMsgHandles(SMgmtWrapper *pWrapper); -SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); +SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); + + +void dndSendStatusReq(SDnode *pDnode); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h new file mode 100644 index 0000000000000000000000000000000000000000..af65949eaf9ebb6abc8fe179dd2adc265a221491 --- /dev/null +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_DNODE_INT_H_ +#define _TD_DND_DNODE_INT_H_ + +#include "dndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +SMgmtFp dmGetMgmtFp(); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_DNODE_INT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dndMgmt.h b/source/dnode/mgmt/dnode/inc/dmMgmt.h similarity index 100% rename from source/dnode/mgmt/dnode/inc/dndMgmt.h rename to source/dnode/mgmt/dnode/inc/dmMgmt.h diff --git a/source/dnode/mgmt/dnode/inc/dmWorker.h b/source/dnode/mgmt/dnode/inc/dmWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..f809df1d09d3a3d53c0b88cd334f1d4dd09b2bbe --- /dev/null +++ b/source/dnode/mgmt/dnode/inc/dmWorker.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_DNODE_WORKER_H_ +#define _TD_DND_DNODE_WORKER_H_ + +#include "dmInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dmStartWorker(); +void dmStopWorker(); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_DNODE_WORKER_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dndFile.c b/source/dnode/mgmt/dnode/src/dmFile.c similarity index 98% rename from source/dnode/mgmt/dnode/src/dndFile.c rename to source/dnode/mgmt/dnode/src/dmFile.c index f903fc6e3350d953406743e1eefd239fed0e65f2..2adcf0bfa8b0a9f2a6becf312e24ec36820795c7 100644 --- a/source/dnode/mgmt/dnode/src/dndFile.c +++ b/source/dnode/mgmt/dnode/src/dmFile.c @@ -14,9 +14,9 @@ */ #define _DEFAULT_SOURCE -#include "dndFile.h" +#include "dmFile.h" -int32_t dndReadFile(SDnode *pDnode) { +int32_t dmReadFile(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); @@ -153,7 +153,7 @@ PRASE_DNODE_OVER: return 0; } -int32_t dndWriteFile(SDnode *pDnode) { +int32_t dmWriteFile(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; char file[PATH_MAX]; @@ -220,12 +220,12 @@ void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) { int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); if (numOfEps != numOfEpsOld) { dndResetDnodes(pDnode, pDnodeEps); - dndWriteFile(pDnode); + dmWriteFile(pDnode); } else { int32_t size = numOfEps * sizeof(SDnodeEp); if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { dndResetDnodes(pDnode, pDnodeEps); - dndWriteFile(pDnode); + dmWriteFile(pDnode); } } diff --git a/source/dnode/mgmt/dnode/src/dndHandle.c b/source/dnode/mgmt/dnode/src/dmHandle.c similarity index 95% rename from source/dnode/mgmt/dnode/src/dndHandle.c rename to source/dnode/mgmt/dnode/src/dmHandle.c index 988fb86ae3a3f289727e78b003aada0c5454aa84..5fcc71d4ed1936814db628e332c87970783b27b6 100644 --- a/source/dnode/mgmt/dnode/src/dndHandle.c +++ b/source/dnode/mgmt/dnode/src/dmHandle.c @@ -14,9 +14,9 @@ */ #define _DEFAULT_SOURCE -#include "dndHandle.h" +#include "dmHandle.h" #include "dndWorker.h" -#include "dndMgmt.h" +#include "dmMgmt.h" static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; @@ -52,7 +52,7 @@ void dndInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dndProcessMgmtMsg); } -SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { +SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; return pMgmt->msgHandles[msgIndex]; } diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/source/dnode/mgmt/dnode/src/dndMgmt.c b/source/dnode/mgmt/dnode/src/dmMgmt.c similarity index 84% rename from source/dnode/mgmt/dnode/src/dndMgmt.c rename to source/dnode/mgmt/dnode/src/dmMgmt.c index 0ceb316307f42bac75d7212dbef8a16807a66f5c..17fdc56ebffecbd35cab1cfff3c846299a211bdc 100644 --- a/source/dnode/mgmt/dnode/src/dndMgmt.c +++ b/source/dnode/mgmt/dnode/src/dmMgmt.c @@ -14,9 +14,11 @@ */ #define _DEFAULT_SOURCE -#include "dndMgmt.h" - -#include "dndHandle.h" +#include "dmMgmt.h" +#include "dmWorker.h" +// #include "dmMgmt.h" +#include "dmFile.h" +#include "dmHandle.h" #include "dndMonitor.h" // #include "dndBnode.h" // #include "mm.h" @@ -30,9 +32,6 @@ #if 0 static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); -static int32_t dndReadFile(SDnode *pDnode); -static int32_t dndWriteFile(SDnode *pDnode); -static void *dnodeThreadRoutine(void *param); static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq); static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp); @@ -164,7 +163,7 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; pMgmt->clusterId = pCfg->clusterId; - dndWriteFile(pDnode); + dmWriteFile(pDnode); taosWUnLockLatch(&pMgmt->latch); } } @@ -176,7 +175,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); pMgmt->dropped = 1; - dndWriteFile(pDnode); + dmWriteFile(pDnode); } } else { SStatusRsp statusRsp = {0}; @@ -216,84 +215,6 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { rpcSendResponse(&rpcRsp); } -static void *dnodeThreadRoutine(void *param) { - SDnode *pDnode = param; - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - int64_t lastStatusTime = taosGetTimestampMs(); - int64_t lastMonitorTime = lastStatusTime; - - setThreadName("dnode-hb"); - - while (true) { - pthread_testcancel(); - taosMsleep(200); - if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) { - continue; - } - - int64_t curTime = taosGetTimestampMs(); - - float statusInterval = (curTime - lastStatusTime) / 1000.0f; - if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) { - dndSendStatusReq(pDnode); - lastStatusTime = curTime; - } - - float monitorInterval = (curTime - lastMonitorTime) / 1000.0f; - if (monitorInterval >= tsMonitorInterval) { - dndSendMonitorReport(pDnode); - lastMonitorTime = curTime; - } - } -} - -int32_t dndInitMgmt(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - - pMgmt->dnodeId = 0; - pMgmt->rebootTime = taosGetTimestampMs(); - pMgmt->dropped = 0; - pMgmt->clusterId = 0; - taosInitRWLatch(&pMgmt->latch); - - pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - if (pMgmt->dnodeHash == NULL) { - dError("failed to init dnode hash"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (dndReadFile(pDnode) != 0) { - dError("failed to read file:%s since %s", pMgmt->file, terrstr()); - return -1; - } - - if (pMgmt->dropped) { - dError("dnode not start since its already dropped"); - return -1; - } - - if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); - return -1; - } - - if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); - return -1; - } - - pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); - if (pMgmt->threadId == NULL) { - dError("failed to init dnode thread"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dInfo("dnode-mgmt is initialized"); - return 0; -} - void dndStopMgmt(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; dndCleanupWorker(&pMgmt->mgmtWorker); @@ -431,7 +352,6 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { -int32_t dndInitMgmt(SDnode *pDnode) {return 0;} void dndStopMgmt(SDnode *pDnode) {} void dndCleanupMgmt(SDnode *pDnode){} @@ -446,13 +366,80 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {} void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq){} void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} -bool dndRequireNode(SMgmtWrapper *pWrapper) { return true; } +static int32_t dmInit(SMgmtWrapper *pWrapper) { + SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt)); + + pMgmt->dnodeId = 0; + pMgmt->rebootTime = taosGetTimestampMs(); + pMgmt->dropped = 0; + pMgmt->clusterId = 0; + taosInitRWLatch(&pMgmt->latch); + + pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pMgmt->dnodeHash == NULL) { + dError("node:%s, failed to init dnode hash", pWrapper->name); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (dmReadFile(pWrapper->pDnode) != 0) { + dError("node:%s, failed to read file since %s", pWrapper->name, terrstr()); + return -1; + } + + if (pMgmt->dropped) { + dError("node:%s, will not start since its already dropped", pWrapper->name); + return -1; + } + + if (dmStartWorker(pMgmt) != 0) { + dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr()); + return -1; + } + + dInfo("dnode-mgmt is initialized"); + return 0; + + // dndSetStatus(pDnode, DND_STAT_RUNNING); + // dndSendStatusReq(pDnode); + // dndReportStartup(pDnode, "TDengine", "initialized successfully"); + +#if 0 + if (dndInitTrans(pDnode) != 0) { + dError("failed to init transport since %s", terrstr()); + return -1; + } + + SDiskCfg dCfg = {0}; + tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN); + dCfg.level = 0; + dCfg.primary = 1; + SDiskCfg *pDisks = pDnode->cfg.pDisks; + int32_t numOfDisks = pDnode->cfg.numOfDisks; + if (numOfDisks <= 0 || pDisks == NULL) { + pDisks = &dCfg; + numOfDisks = 1; + } + + pDnode->pTfs = tfsOpen(pDisks, numOfDisks); + if (pDnode->pTfs == NULL) { + dError("failed to init tfs since %s", terrstr()); + return -1; + } +#endif +} + +static void dmCleanup(SDnode *pDnode, SMgmtWrapper *pWrapper){ + +} + +static bool dmRequire(SMgmtWrapper *pWrapper) { return true; } -SMgmtFp dndGetMgmtFp() { +SMgmtFp dmGetMgmtFp() { SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = NULL; - mgmtFp.closeFp = NULL; - mgmtFp.requiredFp = dndRequireNode; - mgmtFp.getMsgHandleFp = dndGetMsgHandle; + mgmtFp.openFp = dmInit; + mgmtFp.closeFp = dmCleanup; + mgmtFp.requiredFp = dmRequire; + mgmtFp.getMsgHandleFp = dmGetMsgHandle; return mgmtFp; } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c new file mode 100644 index 0000000000000000000000000000000000000000..b99162830af3273dfff1b986759a345e50ed99fc --- /dev/null +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dmWorker.h" +#include "dndWorker.h" +#include "dmHandle.h" + + +static void *dnodeThreadRoutine(void *param) { + SDnode *pDnode = param; + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + int64_t lastStatusTime = taosGetTimestampMs(); + int64_t lastMonitorTime = lastStatusTime; + + setThreadName("dnode-hb"); + + while (true) { + pthread_testcancel(); + taosMsleep(200); + if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) { + continue; + } + + int64_t curTime = taosGetTimestampMs(); + + float statusInterval = (curTime - lastStatusTime) / 1000.0f; + if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) { + dndSendStatusReq(pDnode); + lastStatusTime = curTime; + } + + // float monitorInterval = (curTime - lastMonitorTime) / 1000.0f; + // if (monitorInterval >= tsMonitorInterval) { + // dndSendMonitorReport(pDnode); + // lastMonitorTime = curTime; + // } + } +} + + +static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = 0; + +#if 0 + switch (pMsg->msgType) { + case TDMT_DND_CREATE_MNODE: + code = mmProcessCreateMnodeReq(pDnode, pMsg); + break; + case TDMT_DND_ALTER_MNODE: + code = mmProcessAlterMnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_MNODE: + code = mmProcessDropMnodeReq(pDnode, pMsg); + break; + case TDMT_DND_CREATE_QNODE: + code = dndProcessCreateQnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_QNODE: + code = dndProcessDropQnodeReq(pDnode, pMsg); + break; + case TDMT_DND_CREATE_SNODE: + code = dndProcessCreateSnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_SNODE: + code = dndProcessDropSnodeReq(pDnode, pMsg); + break; + case TDMT_DND_CREATE_BNODE: + code = dndProcessCreateBnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_BNODE: + code = dndProcessDropBnodeReq(pDnode, pMsg); + break; + case TDMT_DND_CONFIG_DNODE: + code = dndProcessConfigDnodeReq(pDnode, pMsg); + break; + case TDMT_MND_STATUS_RSP: + dndProcessStatusRsp(pDnode, pMsg); + break; + case TDMT_MND_AUTH_RSP: + dndProcessAuthRsp(pDnode, pMsg); + break; + case TDMT_MND_GRANT_RSP: + dndProcessGrantRsp(pDnode, pMsg); + break; + case TDMT_DND_CREATE_VNODE: + code = dndProcessCreateVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_ALTER_VNODE: + code = dndProcessAlterVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_VNODE: + code = dndProcessDropVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_SYNC_VNODE: + code = dndProcessSyncVnodeReq(pDnode, pMsg); + break; + case TDMT_DND_COMPACT_VNODE: + code = dndProcessCompactVnodeReq(pDnode, pMsg); + break; + default: + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = -1; + dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); + break; + } +#endif + if (pMsg->msgType & 1u) { + if (code != 0) code = terrno; + SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; + rpcSendResponse(&rsp); + } + + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + taosFreeQitem(pMsg); +} + +int32_t dmStartWorker(SDnodeMgmt *pMgmt) { + if (dndInitWorker(NULL, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) { + dError("failed to start dnode mgmt worker since %s", terrstr()); + return -1; + } + + if (dndInitWorker(NULL, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) { + dError("failed to start dnode mgmt worker since %s", terrstr()); + return -1; + } + +// pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); +// if (pMgmt->threadId == NULL) { +// dError("failed to init dnode thread"); +// terrno = TSDB_CODE_OUT_OF_MEMORY; +// return -1; +// } + + return 0; +} + +void dmStopWorker(SDnodeMgmt *pMgmt) { + #if 0 + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + + taosWLockLatch(&pMgmt->latch); + pMgmt->deployed = 0; + taosWUnLockLatch(&pMgmt->latch); + + while (pMgmt->refCount > 1) { + taosMsleep(10); + } + + dndCleanupWorker(&pMgmt->readWorker); + dndCleanupWorker(&pMgmt->writeWorker); + dndCleanupWorker(&pMgmt->syncWorker); + #endif +} diff --git a/source/dnode/mgmt/mnode/src/mmHandle.c b/source/dnode/mgmt/mnode/src/mmHandle.c index a44c05ccf23638f0f8e1b3c82f08b5019965cedd..ec206fc500e7c608e58a8468db4d4482282871d2 100644 --- a/source/dnode/mgmt/mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mnode/src/mmHandle.c @@ -18,7 +18,7 @@ #include "mmWorker.h" #if 0 -#include "dndMgmt.h" +#include "dmMgmt.h" int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SDCreateMnodeReq createReq = {0}; diff --git a/source/dnode/mgmt/mnode/src/mmMgmt.c b/source/dnode/mgmt/mnode/src/mmMgmt.c index 2abbefae2274e431d210454a2280d93a22e8520a..87319fba5833f8c0ff49f07533940f0978a95f3c 100644 --- a/source/dnode/mgmt/mnode/src/mmMgmt.c +++ b/source/dnode/mgmt/mnode/src/mmMgmt.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -#include "dndMgmt.h" +#include "dmMgmt.h" #include "dndTransport.h" #if 0 diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index f5b6c58fe57fd35aa40c305d4556def7f292eb4b..895efb2f66df9276617c516dadde50dd7d1d180a 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -#include "dndMgmt.h" +#include "dmMgmt.h" #include "dndTransport.h" #include "dndWorker.h" diff --git a/source/dnode/mgmt/qnode/src/qmMgmt.c b/source/dnode/mgmt/qnode/src/qmMgmt.c index a2c9963772865c5348e0e4809de7711b2a73d0fa..c733b4056f9649daab93c4aff7bfee31f48a4429 100644 --- a/source/dnode/mgmt/qnode/src/qmMgmt.c +++ b/source/dnode/mgmt/qnode/src/qmMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE // #include "dndQnode.h" -// #include "dndMgmt.h" +// #include "dmMgmt.h" // #include "dndTransport.h" // #include "dndWorker.h" diff --git a/source/dnode/mgmt/snode/src/smMgmt.c b/source/dnode/mgmt/snode/src/smMgmt.c index a5e3bcefc0dea00d41a550867d37f8d1469e4a7c..6c4de57705b8262d8082a528bdc83c408ad5c95e 100644 --- a/source/dnode/mgmt/snode/src/smMgmt.c +++ b/source/dnode/mgmt/snode/src/smMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE // #include "dndSnode.h" -// #include "dndMgmt.h" +// #include "dmMgmt.h" // #include "dndTransport.h" // #include "dndWorker.h" diff --git a/source/dnode/mgmt/vnode/src/vmMgmt.c b/source/dnode/mgmt/vnode/src/vmMgmt.c index 03bd84779c02e82a6d5eb77ff6b85c17314b2f91..dec217782f522468829241ca770efb87d50ca80c 100644 --- a/source/dnode/mgmt/vnode/src/vmMgmt.c +++ b/source/dnode/mgmt/vnode/src/vmMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "vmMgmt.h" -#include "dndMgmt.h" +#include "dmMgmt.h" #include "dndTransport.h" // #include "sync.h"