diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h
index 8d85ac6627162e6175f43119fc439df87784dd85..165b1acfb6cacf38ee1d58024d360db790917cf0 100644
--- a/src/common/inc/tglobal.h
+++ b/src/common/inc/tglobal.h
@@ -35,6 +35,7 @@ extern int32_t tsNumOfMnodes;
extern int32_t tsEnableVnodeBak;
extern int32_t tsEnableTelemetryReporting;
extern char tsEmail[];
+extern char tsArbitrator[];
// common
extern int tsRpcTimer;
diff --git a/src/dnode/inc/dnodeCfg.h b/src/dnode/inc/dnodeCfg.h
index 35d889646021a079abe18c01a26fa5891718aaa0..d74303f3252e1fa54e174876bab5758be541955e 100644
--- a/src/dnode/inc/dnodeCfg.h
+++ b/src/dnode/inc/dnodeCfg.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitCfg();
void dnodeCleanupCfg();
diff --git a/src/dnode/inc/dnodeCheck.h b/src/dnode/inc/dnodeCheck.h
index a4880b3c114342c8900337372bd1dfd07e7815b7..c94b9e93197f4bfc9a175ee5825fd2c1c25d7072 100644
--- a/src/dnode/inc/dnodeCheck.h
+++ b/src/dnode/inc/dnodeCheck.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitCheck();
void dnodeCleanupCheck();
diff --git a/src/dnode/inc/dnodeEps.h b/src/dnode/inc/dnodeEps.h
index 2a203498c1f270c7e456694a4e1e195cbd9022cd..a5840997b0f9623c8ed504fa1c2eb8209216ea29 100644
--- a/src/dnode/inc/dnodeEps.h
+++ b/src/dnode/inc/dnodeEps.h
@@ -19,8 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
-
-#include "taosmsg.h"
+#include "dnodeInt.h"
int32_t dnodeInitEps();
void dnodeCleanupEps();
diff --git a/src/dnode/inc/dnodeInt.h b/src/dnode/inc/dnodeInt.h
index f4cbee1d1333e3f1ffc213f9e2ecdeb5f4c32d57..7595f5fd02fde85e64cf47d283d3e167cfa751d1 100644
--- a/src/dnode/inc/dnodeInt.h
+++ b/src/dnode/inc/dnodeInt.h
@@ -19,8 +19,13 @@
#ifdef __cplusplus
extern "C" {
#endif
-
+#include "taoserror.h"
+#include "taosmsg.h"
#include "tlog.h"
+#include "trpc.h"
+#include "tglobal.h"
+#include "dnode.h"
+#include "vnode.h"
extern int32_t dDebugFlag;
diff --git a/src/dnode/inc/dnodeMInfos.h b/src/dnode/inc/dnodeMInfos.h
index 9c3c85c47e2dbcc11c5b5a80fbf091bd93855149..4bd0eeec097b58109c5260ce0c2d2d3cce389199 100644
--- a/src/dnode/inc/dnodeMInfos.h
+++ b/src/dnode/inc/dnodeMInfos.h
@@ -19,8 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
-
-#include "taosmsg.h"
+#include "dnodeInt.h"
int32_t dnodeInitMInfos();
void dnodeCleanupMInfos();
@@ -29,6 +28,10 @@ void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetMInfos(SMnodeInfos *minfos);
bool dnodeIsMasterEp(char *ep);
+void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
+void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
+void dnodeGetEpSetForShell(SRpcEpSet *epSet);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/dnode/inc/dnodeMPeer.h b/src/dnode/inc/dnodeMPeer.h
index 00221baa221a411d614c1fa1bb3dc4525de2ed71..b7e566d7e4952d6afbd88e64b36197de4f0f4401 100644
--- a/src/dnode/inc/dnodeMPeer.h
+++ b/src/dnode/inc/dnodeMPeer.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitMPeer();
void dnodeCleanupMPeer();
diff --git a/src/dnode/inc/dnodeMRead.h b/src/dnode/inc/dnodeMRead.h
index 8a8e71227ddf3cd7b7c77ef664ac3d4c935889db..279098d30e137bc6b57ef09ce93b8d12fd421082 100644
--- a/src/dnode/inc/dnodeMRead.h
+++ b/src/dnode/inc/dnodeMRead.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitMRead();
void dnodeCleanupMRead();
diff --git a/src/dnode/inc/dnodeMWrite.h b/src/dnode/inc/dnodeMWrite.h
index 6a3d41bc81a03465a2e097bc711c83a8d8af362e..8d4fcce3befbad35b47b4817c02662adc37addf8 100644
--- a/src/dnode/inc/dnodeMWrite.h
+++ b/src/dnode/inc/dnodeMWrite.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitMWrite();
void dnodeCleanupMWrite();
diff --git a/src/dnode/inc/dnodeMain.h b/src/dnode/inc/dnodeMain.h
index c1480407bde0bbcf7aada20f90084b96fc380a73..ca79d53afdc0b211f3903b9323f7e567d0045093 100644
--- a/src/dnode/inc/dnodeMain.h
+++ b/src/dnode/inc/dnodeMain.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitSystem();
void dnodeCleanUpSystem();
diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h
index 8618de324446ca3f2504d15a6422bcca3a4b51b0..edcefbdd0c9d6ce7e9d5ae84f383f6fa15fc2417 100644
--- a/src/dnode/inc/dnodeModule.h
+++ b/src/dnode/inc/dnodeModule.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitModules();
void dnodeStartModules();
diff --git a/src/dnode/inc/dnodePeer.h b/src/dnode/inc/dnodePeer.h
index 0dcf48f2322be91de792148165b2a127fa983196..6d337ef6dc65d717c382ebaf59a50cf334686a64 100644
--- a/src/dnode/inc/dnodePeer.h
+++ b/src/dnode/inc/dnodePeer.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitServer();
void dnodeCleanupServer();
diff --git a/src/dnode/inc/dnodeShell.h b/src/dnode/inc/dnodeShell.h
index 300c86c5999553b2332b3224fc6c65ef019dc5af..3fa66d6a3bc24d253ede6076f9b6bdacdb14db7d 100644
--- a/src/dnode/inc/dnodeShell.h
+++ b/src/dnode/inc/dnodeShell.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitShell();
void dnodeCleanupShell();
diff --git a/src/dnode/inc/dnodeStep.h b/src/dnode/inc/dnodeStep.h
index b260cb8d79309c9604440bae024cae1cc4a3615a..8b1065dfd8d4cf326995d3581cb8f07ee6a10215 100644
--- a/src/dnode/inc/dnodeStep.h
+++ b/src/dnode/inc/dnodeStep.h
@@ -19,7 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
-#include "dnode.h"
+#include "dnodeInt.h"
int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize);
void dnodeStepCleanup(SStep *pSteps, int32_t stepSize);
diff --git a/src/dnode/inc/dnodeTelemetry.h b/src/dnode/inc/dnodeTelemetry.h
index 6fb62556ae50ec3759c7dc2260d16f36d1daf025..e4fd5a0376e3e7fdca0bbe1be15fa1cd70fd7695 100644
--- a/src/dnode/inc/dnodeTelemetry.h
+++ b/src/dnode/inc/dnodeTelemetry.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitTelemetry();
void dnodeCleanupTelemetry();
diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeVMgmt.h
similarity index 50%
rename from src/dnode/inc/dnodeMgmt.h
rename to src/dnode/inc/dnodeVMgmt.h
index 2038ef5286b32522b11409ba5a253b33228b984d..9421a789249706a0c9bad2408566e859eee26a6b 100644
--- a/src/dnode/inc/dnodeMgmt.h
+++ b/src/dnode/inc/dnodeVMgmt.h
@@ -13,32 +13,17 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_DNODE_MGMT_H
-#define TDENGINE_DNODE_MGMT_H
+#ifndef TDENGINE_DNODE_VMGMT_H
+#define TDENGINE_DNODE_VMGMT_H
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
-#include "trpc.h"
-
-int32_t dnodeInitMgmt();
-void dnodeCleanupMgmt();
-int32_t dnodeInitMgmtTimer();
-void dnodeCleanupMgmtTimer();
-void dnodeDispatchToMgmtQueue(SRpcMsg *rpcMsg);
-
-void* dnodeGetVnode(int32_t vgId);
-int32_t dnodeGetVnodeStatus(void *pVnode);
-void* dnodeGetVnodeRworker(void *pVnode);
-void* dnodeGetVnodeWworker(void *pVnode);
-void* dnodeGetVnodeWal(void *pVnode);
-void* dnodeGetVnodeTsdb(void *pVnode);
-void dnodeReleaseVnode(void *pVnode);
-
-void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
-void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
-void dnodeGetEpSetForShell(SRpcEpSet *epSet);
+int32_t dnodeInitVMgmt();
+void dnodeCleanupVMgmt();
+void dnodeDispatchToVMgmtQueue(SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h
index 5b17693146cb68ab66943593bc2f9cd6587cc225..30dfb1b3a47509c1c3c5568520de0f092adba2c4 100644
--- a/src/dnode/inc/dnodeVRead.h
+++ b/src/dnode/inc/dnodeVRead.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitVRead();
void dnodeCleanupVRead();
diff --git a/src/dnode/inc/dnodeVWrite.h b/src/dnode/inc/dnodeVWrite.h
index 759e9ca8a5599236d08228ddd87ed8d4f8c55dca..2ddff210f8620cfe0544cb5925f633cab8d496af 100644
--- a/src/dnode/inc/dnodeVWrite.h
+++ b/src/dnode/inc/dnodeVWrite.h
@@ -19,6 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
+#include "dnodeInt.h"
int32_t dnodeInitVWrite();
void dnodeCleanupVWrite();
diff --git a/src/dnode/inc/dnodeVnodes.h b/src/dnode/inc/dnodeVnodes.h
new file mode 100644
index 0000000000000000000000000000000000000000..a942a00c783a8f98a5a5fa448edbc894188d622f
--- /dev/null
+++ b/src/dnode/inc/dnodeVnodes.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef TDENGINE_DNODE_VNODES_H
+#define TDENGINE_DNODE_VNODES_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+#include "dnodeInt.h"
+
+int32_t dnodeInitVnodes();
+void dnodeCleanupVnodes();
+int32_t dnodeInitTimer();
+void dnodeCleanupTimer();
+void dnodeSendStatusMsgToMnode();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/dnode/src/dnodeCfg.c b/src/dnode/src/dnodeCfg.c
index 16d109a13a7600c5414b1f1edd175a8f15aa4cf0..89249d773b7c67b8939a7a70f65882f89f8f5190 100644
--- a/src/dnode/src/dnodeCfg.c
+++ b/src/dnode/src/dnodeCfg.c
@@ -16,9 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
-#include "tglobal.h"
-#include "dnode.h"
-#include "dnodeInt.h"
#include "dnodeCfg.h"
static SDnodeCfg tsCfg = {0};
diff --git a/src/dnode/src/dnodeCheck.c b/src/dnode/src/dnodeCheck.c
index a9ee4ac649c2f4b2734ce7f0dd59004b08e2fb67..be26bb967bf7e987e9e990b25fb046f2b8664841 100644
--- a/src/dnode/src/dnodeCheck.c
+++ b/src/dnode/src/dnodeCheck.c
@@ -15,8 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "tglobal.h"
-#include "dnodeInt.h"
#include "dnodeCheck.h"
typedef struct {
diff --git a/src/dnode/src/dnodeEps.c b/src/dnode/src/dnodeEps.c
index 83f294e05eb0e559638fef1259ad96ac6f162460..e1c93ce7ed00baa231d4061dc68bceb8d6631cbe 100644
--- a/src/dnode/src/dnodeEps.c
+++ b/src/dnode/src/dnodeEps.c
@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
-#include "tglobal.h"
#include "hash.h"
-#include "dnode.h"
-#include "dnodeInt.h"
#include "dnodeEps.h"
static SDnodeEps *tsEps = NULL;
diff --git a/src/dnode/src/dnodeMInfos.c b/src/dnode/src/dnodeMInfos.c
index cefe44aebe7f87803141ce3d75c45dca18463849..d2aa77822b3c33d864feb48f96314bd1697c42ff 100644
--- a/src/dnode/src/dnodeMInfos.c
+++ b/src/dnode/src/dnodeMInfos.c
@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
-#include "tglobal.h"
#include "mnode.h"
-#include "dnode.h"
-#include "dnodeInt.h"
#include "dnodeMInfos.h"
static SMnodeInfos tsMInfos;
@@ -286,3 +283,25 @@ static int32_t dnodeWriteMInfos() {
dInfo("successed to write %s", file);
return 0;
}
+
+void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
+ SRpcConnInfo connInfo = {0};
+ rpcGetConnInfo(rpcMsg->handle, &connInfo);
+
+ SRpcEpSet epSet = {0};
+ if (forShell) {
+ dnodeGetEpSetForShell(&epSet);
+ } else {
+ dnodeGetEpSetForPeer(&epSet);
+ }
+
+ dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
+ taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
+
+ for (int32_t i = 0; i < epSet.numOfEps; ++i) {
+ dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
+ epSet.port[i] = htons(epSet.port[i]);
+ }
+
+ rpcSendRedirectRsp(rpcMsg->handle, &epSet);
+}
diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c
index ee6dc5212e4fdb98d4bfb7c3a7678fb6bd81bc8c..0863666f762e792e495c972b63a754492f2a3a9f 100644
--- a/src/dnode/src/dnodeMPeer.c
+++ b/src/dnode/src/dnodeMPeer.c
@@ -15,16 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "taosmsg.h"
-#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
-#include "tglobal.h"
#include "mnode.h"
-#include "dnode.h"
-#include "dnodeInt.h"
-#include "dnodeMgmt.h"
+#include "dnodeVMgmt.h"
+#include "dnodeMInfos.h"
#include "dnodeMWrite.h"
typedef struct {
diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c
index 65f3af7b3bf13efe0ffa80ca5752d66cc4a43e9e..0fc6400d99f8dbcbc57caa2ed1265e36e18f1a93 100644
--- a/src/dnode/src/dnodeMRead.c
+++ b/src/dnode/src/dnodeMRead.c
@@ -15,16 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "taosmsg.h"
-#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
-#include "tglobal.h"
#include "mnode.h"
-#include "dnode.h"
-#include "dnodeInt.h"
-#include "dnodeMgmt.h"
+#include "dnodeVMgmt.h"
+#include "dnodeMInfos.h"
#include "dnodeMRead.h"
typedef struct {
diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c
index ef2d49ef4233eb94f3efd82f77a6918b2b671ffd..bc387e21714a61021fb8140a080937c1eed9d75c 100644
--- a/src/dnode/src/dnodeMWrite.c
+++ b/src/dnode/src/dnodeMWrite.c
@@ -15,17 +15,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "taosmsg.h"
-#include "tutil.h"
#include "ttimer.h"
#include "tqueue.h"
-#include "twal.h"
-#include "tglobal.h"
#include "mnode.h"
-#include "dnode.h"
-#include "dnodeInt.h"
-#include "dnodeMgmt.h"
+#include "dnodeVMgmt.h"
+#include "dnodeMInfos.h"
#include "dnodeMWrite.h"
typedef struct {
diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c
index 9c5c94fbbada2416ff0266bf4dd51fa2a9e0431b..246a1799f3fabfc26f0bd156673488bc172b0391 100644
--- a/src/dnode/src/dnodeMain.c
+++ b/src/dnode/src/dnodeMain.c
@@ -16,15 +16,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taos.h"
-#include "tutil.h"
#include "tconfig.h"
-#include "tglobal.h"
#include "tfile.h"
#include "twal.h"
-#include "trpc.h"
-#include "dnode.h"
-#include "dnodeInt.h"
-#include "dnodeMgmt.h"
+#include "tfs.h"
#include "dnodePeer.h"
#include "dnodeModule.h"
#include "dnodeEps.h"
@@ -33,12 +28,13 @@
#include "dnodeCheck.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
+#include "dnodeVMgmt.h"
+#include "dnodeVnodes.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeMPeer.h"
#include "dnodeShell.h"
#include "dnodeTelemetry.h"
-#include "tfs.h"
static SRunStatus tsRunStatus = TSDB_RUN_STATUS_STOPPED;
@@ -49,27 +45,28 @@ static void dnodeCheckDataDirOpenned(char *dir);
static int dnodeCreateDir(const char *dir);
static SStep tsDnodeSteps[] = {
- {"tfile", tfInit, tfCleanup},
- {"rpc", rpcInit, rpcCleanup},
- {"globalcfg" ,taosCheckGlobalCfg, NULL},
- {"storage", dnodeInitStorage, dnodeCleanupStorage},
- {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
- {"dnodeeps", dnodeInitEps, dnodeCleanupEps},
- {"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
- {"wal", walInit, walCleanUp},
- {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
- {"vread", dnodeInitVRead, dnodeCleanupVRead},
- {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
- {"mread", dnodeInitMRead, dnodeCleanupMRead},
- {"mwrite", dnodeInitMWrite, dnodeCleanupMWrite},
- {"mpeer", dnodeInitMPeer, dnodeCleanupMPeer},
- {"client", dnodeInitClient, dnodeCleanupClient},
- {"server", dnodeInitServer, dnodeCleanupServer},
- {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt},
- {"modules", dnodeInitModules, dnodeCleanupModules},
- {"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer},
- {"shell", dnodeInitShell, dnodeCleanupShell},
- {"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
+ {"dnode-tfile", tfInit, tfCleanup},
+ {"dnode-rpc", rpcInit, rpcCleanup},
+ {"dnode-globalcfg", taosCheckGlobalCfg, NULL},
+ {"dnode-storage", dnodeInitStorage, dnodeCleanupStorage},
+ {"dnode-cfg", dnodeInitCfg, dnodeCleanupCfg},
+ {"dnode-eps", dnodeInitEps, dnodeCleanupEps},
+ {"dnode-minfos", dnodeInitMInfos, dnodeCleanupMInfos},
+ {"dnode-wal", walInit, walCleanUp},
+ {"dnode-check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
+ {"dnode-vread", dnodeInitVRead, dnodeCleanupVRead},
+ {"dnode-vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
+ {"dnode-vmgmt", dnodeInitVMgmt, dnodeCleanupVMgmt},
+ {"dnode-mread", dnodeInitMRead, dnodeCleanupMRead},
+ {"dnode-mwrite", dnodeInitMWrite, dnodeCleanupMWrite},
+ {"dnode-mpeer", dnodeInitMPeer, dnodeCleanupMPeer},
+ {"dnode-client", dnodeInitClient, dnodeCleanupClient},
+ {"dnode-server", dnodeInitServer, dnodeCleanupServer},
+ {"dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes},
+ {"dnode-modules", dnodeInitModules, dnodeCleanupModules},
+ {"dnode-tmr", dnodeInitTimer, dnodeCleanupTimer},
+ {"dnode-shell", dnodeInitShell, dnodeCleanupShell},
+ {"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
};
static int dnodeCreateDir(const char *dir) {
diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c
index 7faa3c8913a7cc98cb7de8f33763d533100a7715..7ab0e72ade0fde87ccf8026e444e62b009eff138 100644
--- a/src/dnode/src/dnodeModule.c
+++ b/src/dnode/src/dnodeModule.c
@@ -15,15 +15,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taosdef.h"
-#include "taosmsg.h"
-#include "tglobal.h"
#include "mnode.h"
#include "http.h"
#include "tmqtt.h"
#include "monitor.h"
-#include "dnode.h"
-#include "dnodeInt.h"
#include "dnodeModule.h"
typedef struct {
diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c
index 9adfcc12b1f6a5f0b6910357b7afc62a0462bf19..3523656efec5e103c19b310613c7e793719344d4 100644
--- a/src/dnode/src/dnodePeer.c
+++ b/src/dnode/src/dnodePeer.c
@@ -21,12 +21,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taosmsg.h"
-#include "tglobal.h"
#include "mnode.h"
-#include "dnode.h"
-#include "dnodeInt.h"
-#include "dnodeMgmt.h"
+#include "dnodeVMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h"
@@ -45,12 +41,12 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;
diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c
index b6a13062f07bdef5d122a8446f7c5fe4a759a4bf..6ec2c30c8c7566a4e769b6ac5e26d1d8132f38a4 100644
--- a/src/dnode/src/dnodeShell.c
+++ b/src/dnode/src/dnodeShell.c
@@ -15,15 +15,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "taosdef.h"
-#include "taosmsg.h"
-#include "tglobal.h"
-#include "tutil.h"
#include "http.h"
#include "mnode.h"
-#include "dnode.h"
-#include "dnodeInt.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeMRead.h"
diff --git a/src/dnode/src/dnodeStep.c b/src/dnode/src/dnodeStep.c
index f899ce93411dc6a0e540ef5d1bf3bd588cad2620..93b4f26c703831c7bc0c65ccf4544f1808383fcb 100644
--- a/src/dnode/src/dnodeStep.c
+++ b/src/dnode/src/dnodeStep.c
@@ -15,9 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "taosmsg.h"
-#include "dnodeInt.h"
#include "dnodeStep.h"
static SStartupStep tsStartupStep;
@@ -32,14 +29,7 @@ void dnodeSendStartupStep(SRpcMsg *pMsg) {
dInfo("nettest msg is received, cont:%s", (char *)pMsg->pCont);
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
-#if 1
memcpy(pStep, &tsStartupStep, sizeof(SStartupStep));
-#else
- static int32_t step = 0;
- sprintf(pStep->name, "module:%d", step++);
- sprintf(pStep->desc, "step:%d", step++);
- if (step > 10) pStep->finished = 1;
-#endif
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c
index 56316e9619344243ab2e5fa46e0acfc6e0331e8a..a135cda055434c46bd990541ecbf02e44efc14bf 100644
--- a/src/dnode/src/dnodeSystem.c
+++ b/src/dnode/src/dnodeSystem.c
@@ -16,9 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tgrant.h"
-#include "tutil.h"
-#include "tglobal.h"
-#include "dnodeInt.h"
#include "dnodeMain.h"
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c
index e973f9901f19b7aa6f4d4958d88e01ba258a82bb..b06ed1eaf4bf65b599685a21cf670aa607db9dbd 100644
--- a/src/dnode/src/dnodeTelemetry.c
+++ b/src/dnode/src/dnodeTelemetry.c
@@ -15,9 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "tglobal.h"
-#include "tutil.h"
#include "osTime.h"
#include "tsocket.h"
#include "tbuffer.h"
@@ -32,8 +29,6 @@
#include "mnodeTable.h"
#include "mnodeSdb.h"
#include "mnodeAcct.h"
-#include "dnode.h"
-#include "dnodeInt.h"
#include "dnodeTelemetry.h"
static tsem_t tsExitSem;
diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c
index bc7e5ff33c16b0cae97ead4c0962ca4ef447ca1c..a3a22e58fd58f7e2a5026a2222d97141ca367fa8 100644
--- a/src/dnode/src/dnodeVMgmt.c
+++ b/src/dnode/src/dnodeVMgmt.c
@@ -15,65 +15,28 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "cJSON.h"
-#include "taoserror.h"
-#include "taosmsg.h"
-#include "ttimer.h"
-#include "tsdb.h"
-#include "twal.h"
#include "tqueue.h"
-#include "tsync.h"
-#include "ttimer.h"
-#include "tbn.h"
-#include "tglobal.h"
-#include "dnode.h"
-#include "vnode.h"
-#include "mnode.h"
-#include "dnodeInt.h"
-#include "dnodeMgmt.h"
-#include "dnodeEps.h"
-#include "dnodeCfg.h"
-#include "dnodeMInfos.h"
-#include "dnodeVRead.h"
-#include "dnodeVWrite.h"
-#include "dnodeModule.h"
-
-typedef struct {
- pthread_t thread;
- int32_t threadIndex;
- int32_t failed;
- int32_t opened;
- int32_t vnodeNum;
- int32_t * vnodeList;
-} SOpenVnodeThread;
+#include "dnodeVMgmt.h"
typedef struct {
SRpcMsg rpcMsg;
char pCont[];
} SMgmtMsg;
-void * tsDnodeTmr = NULL;
-static void * tsStatusTimer = NULL;
-static uint32_t tsRebootTime;
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
-static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
-static void dnodeSendStatusMsg(void *handle, void *tmrId);
-static void *dnodeProcessMgmtQueue(void *param);
-
-static int32_t dnodeOpenVnodes();
-static void dnodeCloseVnodes();
-static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
-static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
-static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
-static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
-static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
-static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
+static void * dnodeProcessMgmtQueue(void *param);
+static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
+static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
+static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
+static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
+static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
+static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
-int32_t dnodeInitMgmt() {
+int32_t dnodeInitVMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
@@ -81,27 +44,18 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
- dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
- tsRebootTime = taosGetTimestampSec();
-
int32_t code = vnodeInitMgmt();
- if (code != TSDB_CODE_SUCCESS) {
- dnodeCleanupMgmt();
- return -1;
- }
+ if (code != TSDB_CODE_SUCCESS) return -1;
- // create the queue and thread to handle the message
tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) {
dError("failed to create the mgmt queue set");
- dnodeCleanupMgmt();
return -1;
}
tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) {
dError("failed to create the mgmt queue");
- dnodeCleanupMgmt();
return -1;
}
@@ -115,62 +69,20 @@ int32_t dnodeInitMgmt() {
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
- dnodeCleanupMgmt();
- return -1;
- }
-
- code = dnodeOpenVnodes();
- if (code != TSDB_CODE_SUCCESS) {
- dnodeCleanupMgmt();
return -1;
}
dInfo("dnode mgmt is initialized");
-
- return TSDB_CODE_SUCCESS;
-}
-
-int32_t dnodeInitMgmtTimer() {
- tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
- if (tsDnodeTmr == NULL) {
- dError("failed to init dnode timer");
- dnodeCleanupMgmt();
- return -1;
- }
-
- taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
- dInfo("dnode mgmt timer is initialized");
return TSDB_CODE_SUCCESS;
}
-void dnodeSendStatusMsgToMnode() {
- if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
- dInfo("force send status msg to mnode");
- taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
- }
-}
-
-void dnodeCleanupMgmtTimer() {
- if (tsStatusTimer != NULL) {
- taosTmrStopA(&tsStatusTimer);
- tsStatusTimer = NULL;
- }
-
- if (tsDnodeTmr != NULL) {
- taosTmrCleanUp(tsDnodeTmr);
- tsDnodeTmr = NULL;
- }
-}
-
-void dnodeCleanupMgmt() {
- dnodeCleanupMgmtTimer();
- dnodeCloseVnodes();
-
+void dnodeCleanupVMgmt() {
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
if (tsQthread) pthread_join(tsQthread, NULL);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
+
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
@@ -180,9 +92,7 @@ void dnodeCleanupMgmt() {
static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
int32_t size = sizeof(SMgmtMsg) + pMsg->contLen;
SMgmtMsg *pMgmt = taosAllocateQitem(size);
- if (pMgmt == NULL) {
- return TSDB_CODE_DND_OUT_OF_MEMORY;
- }
+ if (pMgmt == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont;
@@ -192,7 +102,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
-void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
+void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) {
int32_t code = dnodeWriteToMgmtQueue(pMsg);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
@@ -233,135 +143,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
return NULL;
}
-static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
- DIR *dir = opendir(tsVnodeDir);
- if (dir == NULL) {
- return TSDB_CODE_DND_NO_WRITE_ACCESS;
- }
-
- *numOfVnodes = 0;
- struct dirent *de = NULL;
- while ((de = readdir(dir)) != NULL) {
- if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
- if (de->d_type & DT_DIR) {
- if (strncmp("vnode", de->d_name, 5) != 0) continue;
- int32_t vnode = atoi(de->d_name + 5);
- if (vnode == 0) continue;
-
- (*numOfVnodes)++;
-
- if (*numOfVnodes >= TSDB_MAX_VNODES) {
- dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
- continue;
- } else {
- vnodeList[*numOfVnodes - 1] = vnode;
- }
- }
- }
- closedir(dir);
-
- return TSDB_CODE_SUCCESS;
-}
-
-static void *dnodeOpenVnode(void *param) {
- SOpenVnodeThread *pThread = param;
-
- dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
-
- for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
- int32_t vgId = pThread->vnodeList[v];
- if (vnodeOpen(vgId) < 0) {
- dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
- pThread->failed++;
- } else {
- dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
- pThread->opened++;
- }
- }
-
- dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
- pThread->failed);
- return NULL;
-}
-
-static int32_t dnodeOpenVnodes() {
- int32_t vnodeList[TSDB_MAX_VNODES] = {0};
- int32_t numOfVnodes = 0;
- int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
-
- if (status != TSDB_CODE_SUCCESS) {
- dInfo("get dnode list failed");
- return status;
- }
-
- int32_t threadNum = tsNumOfCores;
- int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
- SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
- for (int32_t t = 0; t < threadNum; ++t) {
- threads[t].threadIndex = t;
- threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
- }
-
- for (int32_t v = 0; v < numOfVnodes; ++v) {
- int32_t t = v % threadNum;
- SOpenVnodeThread *pThread = &threads[t];
- pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
- }
-
- dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
-
- for (int32_t t = 0; t < threadNum; ++t) {
- SOpenVnodeThread *pThread = &threads[t];
- if (pThread->vnodeNum == 0) continue;
-
- pthread_attr_t thAttr;
- pthread_attr_init(&thAttr);
- pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
- if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
- dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
- }
-
- pthread_attr_destroy(&thAttr);
- }
-
- int32_t openVnodes = 0;
- int32_t failedVnodes = 0;
- for (int32_t t = 0; t < threadNum; ++t) {
- SOpenVnodeThread *pThread = &threads[t];
- if (pThread->vnodeNum > 0 && pThread->thread) {
- pthread_join(pThread->thread, NULL);
- }
- openVnodes += pThread->opened;
- failedVnodes += pThread->failed;
- free(pThread->vnodeList);
- }
-
- free(threads);
- dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
-
- return TSDB_CODE_SUCCESS;
-}
-
-static void dnodeCloseVnodes() {
- int32_t vnodeList[TSDB_MAX_VNODES]= {0};
- int32_t numOfVnodes = 0;
- int32_t status;
-
- status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
-
- if (status != TSDB_CODE_SUCCESS) {
- dInfo("get dnode list failed");
- return;
- }
-
- for (int32_t i = 0; i < numOfVnodes; ++i) {
- vnodeClose(vnodeList[i]);
- }
-
- dInfo("total vnodes:%d are all closed", numOfVnodes);
-}
-
-static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
+static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
@@ -421,15 +203,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
}
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
-// SAlterStreamMsg *pStream = pCont;
-// pStream->uid = htobe64(pStream->uid);
-// pStream->stime = htobe64(pStream->stime);
-// pStream->vnode = htonl(pStream->vnode);
-// pStream->sid = htonl(pStream->sid);
-// pStream->status = htonl(pStream->status);
-//
-// int32_t code = dnodeCreateStream(pStream);
-
return 0;
}
@@ -461,110 +234,3 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
-
-static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
- if (pMsg->code != TSDB_CODE_SUCCESS) {
- dError("status rsp is received, error:%s", tstrerror(pMsg->code));
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
- return;
- }
-
- SStatusRsp *pStatusRsp = pMsg->pCont;
- SMnodeInfos *minfos = &pStatusRsp->mnodes;
- dnodeUpdateMInfos(minfos);
-
- SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
- pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
- pCfg->moduleStatus = htonl(pCfg->moduleStatus);
- pCfg->dnodeId = htonl(pCfg->dnodeId);
- dnodeUpdateCfg(pCfg);
-
- vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
-
- SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
- dnodeUpdateEps(pEps);
-
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
-}
-
-static void dnodeSendStatusMsg(void *handle, void *tmrId) {
- if (tsDnodeTmr == NULL) {
- dError("dnode timer is already released");
- return;
- }
-
- if (tsStatusTimer == NULL) {
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
- dError("failed to start status timer");
- return;
- }
-
- int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
- SStatusMsg *pStatus = rpcMallocCont(contLen);
- if (pStatus == NULL) {
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
- dError("failed to malloc status message");
- return;
- }
-
- dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
- pStatus->dnodeId = htonl(dnodeGetDnodeId());
- pStatus->version = htonl(tsVersion);
- pStatus->lastReboot = htonl(tsRebootTime);
- pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
- pStatus->diskAvailable = tsAvailDataDirGB;
- pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
- tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
-
- // fill cluster cfg parameters
- pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes);
- pStatus->clusterCfg.enableBalance = htonl(tsEnableBalance);
- pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum);
- pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold);
- pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
- pStatus->clusterCfg.maxtablesPerVnode = htonl(tsMaxTablePerVnode);
- pStatus->clusterCfg.maxVgroupsPerDb = htonl(tsMaxVgroupsPerDb);
- tstrncpy(pStatus->clusterCfg.arbitrator, tsArbitrator, TSDB_EP_LEN);
- tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
- pStatus->clusterCfg.checkTime = 0;
- char timestr[32] = "1970-01-01 00:00:00.00";
- (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
- tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
- tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
-
- vnodeBuildStatusMsg(pStatus);
- contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
- pStatus->openVnodes = htons(pStatus->openVnodes);
-
- SRpcMsg rpcMsg = {
- .pCont = pStatus,
- .contLen = contLen,
- .msgType = TSDB_MSG_TYPE_DM_STATUS
- };
-
- SRpcEpSet epSet;
- dnodeGetEpSetForPeer(&epSet);
- dnodeSendMsgToDnode(&epSet, &rpcMsg);
-}
-
-void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
- SRpcConnInfo connInfo = {0};
- rpcGetConnInfo(rpcMsg->handle, &connInfo);
-
- SRpcEpSet epSet = {0};
- if (forShell) {
- dnodeGetEpSetForShell(&epSet);
- } else {
- dnodeGetEpSetForPeer(&epSet);
- }
-
- dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
- taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
-
- for (int i = 0; i < epSet.numOfEps; ++i) {
- dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
- epSet.port[i] = htons(epSet.port[i]);
- }
-
- rpcSendRedirectRsp(rpcMsg->handle, &epSet);
-}
diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c
index b42a627a3a78031be3191d7be5fb1202723dcc57..07496b142a210904e9842c0fe7a0ad613e41ba7a 100644
--- a/src/dnode/src/dnodeVRead.c
+++ b/src/dnode/src/dnodeVRead.c
@@ -15,12 +15,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "taosmsg.h"
-#include "tglobal.h"
#include "tqueue.h"
-#include "vnode.h"
-#include "dnodeInt.h"
+#include "dnodeVRead.h"
typedef struct {
pthread_t thread; // thread
diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c
index 6d4b50ee542b565669f0a4782142a3919c19ebe8..a5ae8ac83063c599ad4c215bd0a7fa4468810580 100644
--- a/src/dnode/src/dnodeVWrite.c
+++ b/src/dnode/src/dnodeVWrite.c
@@ -15,13 +15,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taoserror.h"
-#include "taosmsg.h"
-#include "tglobal.h"
#include "tqueue.h"
-#include "twal.h"
-#include "vnode.h"
-#include "dnodeInt.h"
+#include "dnodeVWrite.h"
typedef struct {
taos_qall qall;
diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c
new file mode 100644
index 0000000000000000000000000000000000000000..4e5ce9a0ae8a66772699a4ac6f3d166463a72b81
--- /dev/null
+++ b/src/dnode/src/dnodeVnodes.c
@@ -0,0 +1,284 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "os.h"
+#include "ttimer.h"
+#include "dnodeEps.h"
+#include "dnodeCfg.h"
+#include "dnodeMInfos.h"
+#include "dnodeVnodes.h"
+
+typedef struct {
+ pthread_t thread;
+ int32_t threadIndex;
+ int32_t failed;
+ int32_t opened;
+ int32_t vnodeNum;
+ int32_t * vnodeList;
+} SOpenVnodeThread;
+
+void * tsDnodeTmr = NULL;
+static void * tsStatusTimer = NULL;
+static uint32_t tsRebootTime = 0;
+
+static void dnodeSendStatusMsg(void *handle, void *tmrId);
+static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
+
+int32_t dnodeInitTimer() {
+ tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
+ if (tsDnodeTmr == NULL) {
+ dError("failed to init dnode timer");
+ return -1;
+ }
+
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
+
+ tsRebootTime = taosGetTimestampSec();
+ taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
+
+ dInfo("dnode timer is initialized");
+ return TSDB_CODE_SUCCESS;
+}
+
+void dnodeCleanupTimer() {
+ if (tsStatusTimer != NULL) {
+ taosTmrStopA(&tsStatusTimer);
+ tsStatusTimer = NULL;
+ }
+
+ if (tsDnodeTmr != NULL) {
+ taosTmrCleanUp(tsDnodeTmr);
+ tsDnodeTmr = NULL;
+ }
+}
+
+static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
+ DIR *dir = opendir(tsVnodeDir);
+ if (dir == NULL) return TSDB_CODE_DND_NO_WRITE_ACCESS;
+
+ *numOfVnodes = 0;
+ struct dirent *de = NULL;
+ while ((de = readdir(dir)) != NULL) {
+ if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
+ if (de->d_type & DT_DIR) {
+ if (strncmp("vnode", de->d_name, 5) != 0) continue;
+ int32_t vnode = atoi(de->d_name + 5);
+ if (vnode == 0) continue;
+
+ (*numOfVnodes)++;
+
+ if (*numOfVnodes >= TSDB_MAX_VNODES) {
+ dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
+ continue;
+ } else {
+ vnodeList[*numOfVnodes - 1] = vnode;
+ }
+ }
+ }
+ closedir(dir);
+
+ return TSDB_CODE_SUCCESS;
+}
+
+static void *dnodeOpenVnode(void *param) {
+ SOpenVnodeThread *pThread = param;
+
+ dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
+
+ for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
+ int32_t vgId = pThread->vnodeList[v];
+ if (vnodeOpen(vgId) < 0) {
+ dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
+ pThread->failed++;
+ } else {
+ dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
+ pThread->opened++;
+ }
+ }
+
+ dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
+ pThread->failed);
+ return NULL;
+}
+
+int32_t dnodeInitVnodes() {
+ int32_t vnodeList[TSDB_MAX_VNODES] = {0};
+ int32_t numOfVnodes = 0;
+ int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
+
+ if (status != TSDB_CODE_SUCCESS) {
+ dInfo("get dnode list failed");
+ return status;
+ }
+
+ int32_t threadNum = tsNumOfCores;
+ int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
+ SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
+ for (int32_t t = 0; t < threadNum; ++t) {
+ threads[t].threadIndex = t;
+ threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
+ }
+
+ for (int32_t v = 0; v < numOfVnodes; ++v) {
+ int32_t t = v % threadNum;
+ SOpenVnodeThread *pThread = &threads[t];
+ pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
+ }
+
+ dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
+
+ for (int32_t t = 0; t < threadNum; ++t) {
+ SOpenVnodeThread *pThread = &threads[t];
+ if (pThread->vnodeNum == 0) continue;
+
+ pthread_attr_t thAttr;
+ pthread_attr_init(&thAttr);
+ pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
+ if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
+ dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
+ }
+
+ pthread_attr_destroy(&thAttr);
+ }
+
+ int32_t openVnodes = 0;
+ int32_t failedVnodes = 0;
+ for (int32_t t = 0; t < threadNum; ++t) {
+ SOpenVnodeThread *pThread = &threads[t];
+ if (pThread->vnodeNum > 0 && pThread->thread) {
+ pthread_join(pThread->thread, NULL);
+ }
+ openVnodes += pThread->opened;
+ failedVnodes += pThread->failed;
+ free(pThread->vnodeList);
+ }
+
+ free(threads);
+ dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
+
+ return TSDB_CODE_SUCCESS;
+}
+
+void dnodeCleanupVnodes() {
+ int32_t vnodeList[TSDB_MAX_VNODES]= {0};
+ int32_t numOfVnodes = 0;
+ int32_t status;
+
+ status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
+
+ if (status != TSDB_CODE_SUCCESS) {
+ dInfo("get dnode list failed");
+ return;
+ }
+
+ for (int32_t i = 0; i < numOfVnodes; ++i) {
+ vnodeClose(vnodeList[i]);
+ }
+
+ dInfo("total vnodes:%d are all closed", numOfVnodes);
+}
+
+static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
+ if (pMsg->code != TSDB_CODE_SUCCESS) {
+ dError("status rsp is received, error:%s", tstrerror(pMsg->code));
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+ return;
+ }
+
+ SStatusRsp *pStatusRsp = pMsg->pCont;
+ SMnodeInfos *minfos = &pStatusRsp->mnodes;
+ dnodeUpdateMInfos(minfos);
+
+ SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
+ pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
+ pCfg->moduleStatus = htonl(pCfg->moduleStatus);
+ pCfg->dnodeId = htonl(pCfg->dnodeId);
+ dnodeUpdateCfg(pCfg);
+
+ vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
+
+ SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
+ dnodeUpdateEps(pEps);
+
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+}
+
+static void dnodeSendStatusMsg(void *handle, void *tmrId) {
+ if (tsDnodeTmr == NULL) {
+ dError("dnode timer is already released");
+ return;
+ }
+
+ if (tsStatusTimer == NULL) {
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+ dError("failed to start status timer");
+ return;
+ }
+
+ int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
+ SStatusMsg *pStatus = rpcMallocCont(contLen);
+ if (pStatus == NULL) {
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+ dError("failed to malloc status message");
+ return;
+ }
+
+ dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
+ pStatus->dnodeId = htonl(dnodeGetDnodeId());
+ pStatus->version = htonl(tsVersion);
+ pStatus->lastReboot = htonl(tsRebootTime);
+ pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
+ pStatus->diskAvailable = tsAvailDataDirGB;
+ pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
+ tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
+
+ // fill cluster cfg parameters
+ pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes);
+ pStatus->clusterCfg.enableBalance = htonl(tsEnableBalance);
+ pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum);
+ pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold);
+ pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
+ pStatus->clusterCfg.maxtablesPerVnode = htonl(tsMaxTablePerVnode);
+ pStatus->clusterCfg.maxVgroupsPerDb = htonl(tsMaxVgroupsPerDb);
+ tstrncpy(pStatus->clusterCfg.arbitrator, tsArbitrator, TSDB_EP_LEN);
+ tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
+ pStatus->clusterCfg.checkTime = 0;
+ char timestr[32] = "1970-01-01 00:00:00.00";
+ (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
+ tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
+ tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
+
+ vnodeBuildStatusMsg(pStatus);
+ contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
+ pStatus->openVnodes = htons(pStatus->openVnodes);
+
+ SRpcMsg rpcMsg = {
+ .pCont = pStatus,
+ .contLen = contLen,
+ .msgType = TSDB_MSG_TYPE_DM_STATUS
+ };
+
+ SRpcEpSet epSet;
+ dnodeGetEpSetForPeer(&epSet);
+ dnodeSendMsgToDnode(&epSet, &rpcMsg);
+}
+
+void dnodeSendStatusMsgToMnode() {
+ if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
+ dInfo("force send status msg to mnode");
+ taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
+ }
+}