提交 6f355840 编写于 作者: S Shengliang Guan

TD-2324

上级 2144ecfb
...@@ -35,6 +35,7 @@ extern int32_t tsNumOfMnodes; ...@@ -35,6 +35,7 @@ extern int32_t tsNumOfMnodes;
extern int32_t tsEnableVnodeBak; extern int32_t tsEnableVnodeBak;
extern int32_t tsEnableTelemetryReporting; extern int32_t tsEnableTelemetryReporting;
extern char tsEmail[]; extern char tsEmail[];
extern char tsArbitrator[];
// common // common
extern int tsRpcTimer; extern int tsRpcTimer;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitCfg(); int32_t dnodeInitCfg();
void dnodeCleanupCfg(); void dnodeCleanupCfg();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitCheck(); int32_t dnodeInitCheck();
void dnodeCleanupCheck(); void dnodeCleanupCheck();
......
...@@ -19,8 +19,7 @@ ...@@ -19,8 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
#include "taosmsg.h"
int32_t dnodeInitEps(); int32_t dnodeInitEps();
void dnodeCleanupEps(); void dnodeCleanupEps();
......
...@@ -19,8 +19,13 @@ ...@@ -19,8 +19,13 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h"
#include "tglobal.h"
#include "dnode.h"
#include "vnode.h"
extern int32_t dDebugFlag; extern int32_t dDebugFlag;
......
...@@ -19,8 +19,7 @@ ...@@ -19,8 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
#include "taosmsg.h"
int32_t dnodeInitMInfos(); int32_t dnodeInitMInfos();
void dnodeCleanupMInfos(); void dnodeCleanupMInfos();
...@@ -29,6 +28,10 @@ void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet); ...@@ -29,6 +28,10 @@ void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetMInfos(SMnodeInfos *minfos); void dnodeGetMInfos(SMnodeInfos *minfos);
bool dnodeIsMasterEp(char *ep); bool dnodeIsMasterEp(char *ep);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitMPeer(); int32_t dnodeInitMPeer();
void dnodeCleanupMPeer(); void dnodeCleanupMPeer();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitMRead(); int32_t dnodeInitMRead();
void dnodeCleanupMRead(); void dnodeCleanupMRead();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitMWrite(); int32_t dnodeInitMWrite();
void dnodeCleanupMWrite(); void dnodeCleanupMWrite();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitSystem(); int32_t dnodeInitSystem();
void dnodeCleanUpSystem(); void dnodeCleanUpSystem();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitModules(); int32_t dnodeInitModules();
void dnodeStartModules(); void dnodeStartModules();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitServer(); int32_t dnodeInitServer();
void dnodeCleanupServer(); void dnodeCleanupServer();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitShell(); int32_t dnodeInitShell();
void dnodeCleanupShell(); void dnodeCleanupShell();
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnode.h" #include "dnodeInt.h"
int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize); int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize);
void dnodeStepCleanup(SStep *pSteps, int32_t stepSize); void dnodeStepCleanup(SStep *pSteps, int32_t stepSize);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitTelemetry(); int32_t dnodeInitTelemetry();
void dnodeCleanupTelemetry(); void dnodeCleanupTelemetry();
......
...@@ -13,32 +13,17 @@ ...@@ -13,32 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_DNODE_MGMT_H #ifndef TDENGINE_DNODE_VMGMT_H
#define TDENGINE_DNODE_MGMT_H #define TDENGINE_DNODE_VMGMT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
#include "trpc.h" int32_t dnodeInitVMgmt();
void dnodeCleanupVMgmt();
int32_t dnodeInitMgmt(); void dnodeDispatchToVMgmtQueue(SRpcMsg *rpcMsg);
void dnodeCleanupMgmt();
int32_t dnodeInitMgmtTimer();
void dnodeCleanupMgmtTimer();
void dnodeDispatchToMgmtQueue(SRpcMsg *rpcMsg);
void* dnodeGetVnode(int32_t vgId);
int32_t dnodeGetVnodeStatus(void *pVnode);
void* dnodeGetVnodeRworker(void *pVnode);
void* dnodeGetVnodeWworker(void *pVnode);
void* dnodeGetVnodeWal(void *pVnode);
void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitVRead(); int32_t dnodeInitVRead();
void dnodeCleanupVRead(); void dnodeCleanupVRead();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h"
int32_t dnodeInitVWrite(); int32_t dnodeInitVWrite();
void dnodeCleanupVWrite(); void dnodeCleanupVWrite();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_VNODES_H
#define TDENGINE_DNODE_VNODES_H
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
int32_t dnodeInitTimer();
void dnodeCleanupTimer();
void dnodeSendStatusMsgToMnode();
#ifdef __cplusplus
}
#endif
#endif
...@@ -16,9 +16,6 @@ ...@@ -16,9 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "tglobal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeCfg.h" #include "dnodeCfg.h"
static SDnodeCfg tsCfg = {0}; static SDnodeCfg tsCfg = {0};
......
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tglobal.h"
#include "dnodeInt.h"
#include "dnodeCheck.h" #include "dnodeCheck.h"
typedef struct { typedef struct {
......
...@@ -16,10 +16,7 @@ ...@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "tglobal.h"
#include "hash.h" #include "hash.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeEps.h" #include "dnodeEps.h"
static SDnodeEps *tsEps = NULL; static SDnodeEps *tsEps = NULL;
......
...@@ -16,10 +16,7 @@ ...@@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "tglobal.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMInfos.h" #include "dnodeMInfos.h"
static SMnodeInfos tsMInfos; static SMnodeInfos tsMInfos;
...@@ -286,3 +283,25 @@ static int32_t dnodeWriteMInfos() { ...@@ -286,3 +283,25 @@ static int32_t dnodeWriteMInfos() {
dInfo("successed to write %s", file); dInfo("successed to write %s", file);
return 0; return 0;
} }
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
epSet.port[i] = htons(epSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}
...@@ -15,16 +15,11 @@ ...@@ -15,16 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "tglobal.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h" #include "dnodeVMgmt.h"
#include "dnodeInt.h" #include "dnodeMInfos.h"
#include "dnodeMgmt.h"
#include "dnodeMWrite.h" #include "dnodeMWrite.h"
typedef struct { typedef struct {
......
...@@ -15,16 +15,11 @@ ...@@ -15,16 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "tglobal.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h" #include "dnodeVMgmt.h"
#include "dnodeInt.h" #include "dnodeMInfos.h"
#include "dnodeMgmt.h"
#include "dnodeMRead.h" #include "dnodeMRead.h"
typedef struct { typedef struct {
......
...@@ -15,17 +15,11 @@ ...@@ -15,17 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "ttimer.h" #include "ttimer.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h" #include "dnodeVMgmt.h"
#include "dnodeInt.h" #include "dnodeMInfos.h"
#include "dnodeMgmt.h"
#include "dnodeMWrite.h" #include "dnodeMWrite.h"
typedef struct { typedef struct {
......
...@@ -16,15 +16,10 @@ ...@@ -16,15 +16,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "tutil.h"
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h"
#include "tfile.h" #include "tfile.h"
#include "twal.h" #include "twal.h"
#include "trpc.h" #include "tfs.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodePeer.h" #include "dnodePeer.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeEps.h" #include "dnodeEps.h"
...@@ -33,12 +28,13 @@ ...@@ -33,12 +28,13 @@
#include "dnodeCheck.h" #include "dnodeCheck.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeVMgmt.h"
#include "dnodeVnodes.h"
#include "dnodeMRead.h" #include "dnodeMRead.h"
#include "dnodeMWrite.h" #include "dnodeMWrite.h"
#include "dnodeMPeer.h" #include "dnodeMPeer.h"
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeTelemetry.h" #include "dnodeTelemetry.h"
#include "tfs.h"
static SRunStatus tsRunStatus = TSDB_RUN_STATUS_STOPPED; static SRunStatus tsRunStatus = TSDB_RUN_STATUS_STOPPED;
...@@ -49,27 +45,28 @@ static void dnodeCheckDataDirOpenned(char *dir); ...@@ -49,27 +45,28 @@ static void dnodeCheckDataDirOpenned(char *dir);
static int dnodeCreateDir(const char *dir); static int dnodeCreateDir(const char *dir);
static SStep tsDnodeSteps[] = { static SStep tsDnodeSteps[] = {
{"tfile", tfInit, tfCleanup}, {"dnode-tfile", tfInit, tfCleanup},
{"rpc", rpcInit, rpcCleanup}, {"dnode-rpc", rpcInit, rpcCleanup},
{"globalcfg" ,taosCheckGlobalCfg, NULL}, {"dnode-globalcfg", taosCheckGlobalCfg, NULL},
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"dnode-storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, {"dnode-cfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps}, {"dnode-eps", dnodeInitEps, dnodeCleanupEps},
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos}, {"dnode-minfos", dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp}, {"dnode-wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"dnode-check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVRead, dnodeCleanupVRead}, {"dnode-vread", dnodeInitVRead, dnodeCleanupVRead},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite}, {"dnode-vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMRead, dnodeCleanupMRead}, {"dnode-vmgmt", dnodeInitVMgmt, dnodeCleanupVMgmt},
{"mwrite", dnodeInitMWrite, dnodeCleanupMWrite}, {"dnode-mread", dnodeInitMRead, dnodeCleanupMRead},
{"mpeer", dnodeInitMPeer, dnodeCleanupMPeer}, {"dnode-mwrite", dnodeInitMWrite, dnodeCleanupMWrite},
{"client", dnodeInitClient, dnodeCleanupClient}, {"dnode-mpeer", dnodeInitMPeer, dnodeCleanupMPeer},
{"server", dnodeInitServer, dnodeCleanupServer}, {"dnode-client", dnodeInitClient, dnodeCleanupClient},
{"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, {"dnode-server", dnodeInitServer, dnodeCleanupServer},
{"modules", dnodeInitModules, dnodeCleanupModules}, {"dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes},
{"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer}, {"dnode-modules", dnodeInitModules, dnodeCleanupModules},
{"shell", dnodeInitShell, dnodeCleanupShell}, {"dnode-tmr", dnodeInitTimer, dnodeCleanupTimer},
{"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry}, {"dnode-shell", dnodeInitShell, dnodeCleanupShell},
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
}; };
static int dnodeCreateDir(const char *dir) { static int dnodeCreateDir(const char *dir) {
......
...@@ -15,15 +15,10 @@ ...@@ -15,15 +15,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "mnode.h" #include "mnode.h"
#include "http.h" #include "http.h"
#include "tmqtt.h" #include "tmqtt.h"
#include "monitor.h" #include "monitor.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeModule.h" #include "dnodeModule.h"
typedef struct { typedef struct {
......
...@@ -21,12 +21,8 @@ ...@@ -21,12 +21,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h" #include "dnodeVMgmt.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMPeer.h" #include "dnodeMPeer.h"
#include "dnodeMInfos.h" #include "dnodeMInfos.h"
...@@ -45,12 +41,12 @@ int32_t dnodeInitServer() { ...@@ -45,12 +41,12 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;
......
...@@ -15,15 +15,8 @@ ...@@ -15,15 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tutil.h"
#include "http.h" #include "http.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMRead.h" #include "dnodeMRead.h"
......
...@@ -15,9 +15,6 @@ ...@@ -15,9 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "dnodeInt.h"
#include "dnodeStep.h" #include "dnodeStep.h"
static SStartupStep tsStartupStep; static SStartupStep tsStartupStep;
...@@ -32,14 +29,7 @@ void dnodeSendStartupStep(SRpcMsg *pMsg) { ...@@ -32,14 +29,7 @@ void dnodeSendStartupStep(SRpcMsg *pMsg) {
dInfo("nettest msg is received, cont:%s", (char *)pMsg->pCont); dInfo("nettest msg is received, cont:%s", (char *)pMsg->pCont);
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
#if 1
memcpy(pStep, &tsStartupStep, sizeof(SStartupStep)); memcpy(pStep, &tsStartupStep, sizeof(SStartupStep));
#else
static int32_t step = 0;
sprintf(pStep->name, "module:%d", step++);
sprintf(pStep->desc, "step:%d", step++);
if (step > 10) pStep->finished = 1;
#endif
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
......
...@@ -16,9 +16,6 @@ ...@@ -16,9 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tgrant.h" #include "tgrant.h"
#include "tutil.h"
#include "tglobal.h"
#include "dnodeInt.h"
#include "dnodeMain.h" #include "dnodeMain.h"
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
......
...@@ -15,9 +15,6 @@ ...@@ -15,9 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "tutil.h"
#include "osTime.h" #include "osTime.h"
#include "tsocket.h" #include "tsocket.h"
#include "tbuffer.h" #include "tbuffer.h"
...@@ -32,8 +29,6 @@ ...@@ -32,8 +29,6 @@
#include "mnodeTable.h" #include "mnodeTable.h"
#include "mnodeSdb.h" #include "mnodeSdb.h"
#include "mnodeAcct.h" #include "mnodeAcct.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeTelemetry.h" #include "dnodeTelemetry.h"
static tsem_t tsExitSem; static tsem_t tsExitSem;
......
...@@ -15,65 +15,28 @@ ...@@ -15,65 +15,28 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "cJSON.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "ttimer.h"
#include "tsdb.h"
#include "twal.h"
#include "tqueue.h" #include "tqueue.h"
#include "tsync.h" #include "dnodeVMgmt.h"
#include "ttimer.h"
#include "tbn.h"
#include "tglobal.h"
#include "dnode.h"
#include "vnode.h"
#include "mnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeEps.h"
#include "dnodeCfg.h"
#include "dnodeMInfos.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeModule.h"
typedef struct {
pthread_t thread;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
int32_t * vnodeList;
} SOpenVnodeThread;
typedef struct { typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
char pCont[]; char pCont[];
} SMgmtMsg; } SMgmtMsg;
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static taos_qset tsMgmtQset = NULL; static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL; static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread; static pthread_t tsQthread;
static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void * dnodeProcessMgmtQueue(void *param);
static void dnodeSendStatusMsg(void *handle, void *tmrId); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static void *dnodeProcessMgmtQueue(void *param); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeOpenVnodes(); static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static void dnodeCloseVnodes(); static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() { int32_t dnodeInitVMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
...@@ -81,27 +44,18 @@ int32_t dnodeInitMgmt() { ...@@ -81,27 +44,18 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
tsRebootTime = taosGetTimestampSec();
int32_t code = vnodeInitMgmt(); int32_t code = vnodeInitMgmt();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) return -1;
dnodeCleanupMgmt();
return -1;
}
// create the queue and thread to handle the message
tsMgmtQset = taosOpenQset(); tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) { if (tsMgmtQset == NULL) {
dError("failed to create the mgmt queue set"); dError("failed to create the mgmt queue set");
dnodeCleanupMgmt();
return -1; return -1;
} }
tsMgmtQueue = taosOpenQueue(); tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) { if (tsMgmtQueue == NULL) {
dError("failed to create the mgmt queue"); dError("failed to create the mgmt queue");
dnodeCleanupMgmt();
return -1; return -1;
} }
...@@ -115,62 +69,20 @@ int32_t dnodeInitMgmt() { ...@@ -115,62 +69,20 @@ int32_t dnodeInitMgmt() {
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
if (code != 0) { if (code != 0) {
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno)); dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
dnodeCleanupMgmt();
return -1;
}
code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt();
return -1; return -1;
} }
dInfo("dnode mgmt is initialized"); dInfo("dnode mgmt is initialized");
return TSDB_CODE_SUCCESS;
}
int32_t dnodeInitMgmtTimer() {
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
dnodeCleanupMgmt();
return -1;
}
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
dInfo("dnode mgmt timer is initialized");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeSendStatusMsgToMnode() { void dnodeCleanupVMgmt() {
if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
dInfo("force send status msg to mnode");
taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
}
}
void dnodeCleanupMgmtTimer() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
}
void dnodeCleanupMgmt() {
dnodeCleanupMgmtTimer();
dnodeCloseVnodes();
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset); if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset);
if (tsQthread) pthread_join(tsQthread, NULL); if (tsQthread) pthread_join(tsQthread, NULL);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue); if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset); if (tsMgmtQset) taosCloseQset(tsMgmtQset);
tsMgmtQset = NULL; tsMgmtQset = NULL;
tsMgmtQueue = NULL; tsMgmtQueue = NULL;
...@@ -180,9 +92,7 @@ void dnodeCleanupMgmt() { ...@@ -180,9 +92,7 @@ void dnodeCleanupMgmt() {
static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
int32_t size = sizeof(SMgmtMsg) + pMsg->contLen; int32_t size = sizeof(SMgmtMsg) + pMsg->contLen;
SMgmtMsg *pMgmt = taosAllocateQitem(size); SMgmtMsg *pMgmt = taosAllocateQitem(size);
if (pMgmt == NULL) { if (pMgmt == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
pMgmt->rpcMsg = *pMsg; pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont; pMgmt->rpcMsg.pCont = pMgmt->pCont;
...@@ -192,7 +102,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { ...@@ -192,7 +102,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) {
int32_t code = dnodeWriteToMgmtQueue(pMsg); int32_t code = dnodeWriteToMgmtQueue(pMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = code}; SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
...@@ -233,135 +143,7 @@ static void *dnodeProcessMgmtQueue(void *param) { ...@@ -233,135 +143,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
return NULL; return NULL;
} }
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) {
return TSDB_CODE_DND_NO_WRITE_ACCESS;
}
*numOfVnodes = 0;
struct dirent *de = NULL;
while ((de = readdir(dir)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
if (de->d_type & DT_DIR) {
if (strncmp("vnode", de->d_name, 5) != 0) continue;
int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue;
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = vnode;
}
}
}
closedir(dir);
return TSDB_CODE_SUCCESS;
}
static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
if (vnodeOpen(vgId) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
}
dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t dnodeOpenVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
return status;
}
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SOpenVnodeThread *pThread = &threads[t];
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
}
int32_t openVnodes = 0;
int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && pThread->thread) {
pthread_join(pThread->thread, NULL);
}
openVnodes += pThread->opened;
failedVnodes += pThread->failed;
free(pThread->vnodeList);
}
free(threads);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
return TSDB_CODE_SUCCESS;
}
static void dnodeCloseVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0;
int32_t status;
status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeClose(vnodeList[i]);
}
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont; SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion); pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
...@@ -421,15 +203,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -421,15 +203,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
} }
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) { static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// SAlterStreamMsg *pStream = pCont;
// pStream->uid = htobe64(pStream->uid);
// pStream->stime = htobe64(pStream->stime);
// pStream->vnode = htonl(pStream->vnode);
// pStream->sid = htonl(pStream->sid);
// pStream->status = htonl(pStream->status);
//
// int32_t code = dnodeCreateStream(pStream);
return 0; return 0;
} }
...@@ -461,110 +234,3 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { ...@@ -461,110 +234,3 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SStatusRsp *pStatusRsp = pMsg->pCont;
SMnodeInfos *minfos = &pStatusRsp->mnodes;
dnodeUpdateMInfos(minfos);
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->moduleStatus = htonl(pCfg->moduleStatus);
pCfg->dnodeId = htonl(pCfg->dnodeId);
dnodeUpdateCfg(pCfg);
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
dnodeUpdateEps(pEps);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
return;
}
if (tsStatusTimer == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to start status timer");
return;
}
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message");
return;
}
dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
pStatus->dnodeId = htonl(dnodeGetDnodeId());
pStatus->version = htonl(tsVersion);
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
// fill cluster cfg parameters
pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes);
pStatus->clusterCfg.enableBalance = htonl(tsEnableBalance);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum);
pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold);
pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
pStatus->clusterCfg.maxtablesPerVnode = htonl(tsMaxTablePerVnode);
pStatus->clusterCfg.maxVgroupsPerDb = htonl(tsMaxVgroupsPerDb);
tstrncpy(pStatus->clusterCfg.arbitrator, tsArbitrator, TSDB_EP_LEN);
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
pStatus->clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
.pCont = pStatus,
.contLen = contLen,
.msgType = TSDB_MSG_TYPE_DM_STATUS
};
SRpcEpSet epSet;
dnodeGetEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, &rpcMsg);
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
for (int i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
epSet.port[i] = htons(epSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}
...@@ -15,12 +15,8 @@ ...@@ -15,12 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "vnode.h" #include "dnodeVRead.h"
#include "dnodeInt.h"
typedef struct { typedef struct {
pthread_t thread; // thread pthread_t thread; // thread
......
...@@ -15,13 +15,8 @@ ...@@ -15,13 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "dnodeVWrite.h"
#include "vnode.h"
#include "dnodeInt.h"
typedef struct { typedef struct {
taos_qall qall; taos_qall qall;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "ttimer.h"
#include "dnodeEps.h"
#include "dnodeCfg.h"
#include "dnodeMInfos.h"
#include "dnodeVnodes.h"
typedef struct {
pthread_t thread;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
int32_t * vnodeList;
} SOpenVnodeThread;
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime = 0;
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
int32_t dnodeInitTimer() {
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
tsRebootTime = taosGetTimestampSec();
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
dInfo("dnode timer is initialized");
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupTimer() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
}
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) return TSDB_CODE_DND_NO_WRITE_ACCESS;
*numOfVnodes = 0;
struct dirent *de = NULL;
while ((de = readdir(dir)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
if (de->d_type & DT_DIR) {
if (strncmp("vnode", de->d_name, 5) != 0) continue;
int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue;
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = vnode;
}
}
}
closedir(dir);
return TSDB_CODE_SUCCESS;
}
static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
if (vnodeOpen(vgId) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
}
dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
int32_t dnodeInitVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
return status;
}
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SOpenVnodeThread *pThread = &threads[t];
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
}
int32_t openVnodes = 0;
int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && pThread->thread) {
pthread_join(pThread->thread, NULL);
}
openVnodes += pThread->opened;
failedVnodes += pThread->failed;
free(pThread->vnodeList);
}
free(threads);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0;
int32_t status;
status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeClose(vnodeList[i]);
}
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SStatusRsp *pStatusRsp = pMsg->pCont;
SMnodeInfos *minfos = &pStatusRsp->mnodes;
dnodeUpdateMInfos(minfos);
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->moduleStatus = htonl(pCfg->moduleStatus);
pCfg->dnodeId = htonl(pCfg->dnodeId);
dnodeUpdateCfg(pCfg);
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
dnodeUpdateEps(pEps);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
return;
}
if (tsStatusTimer == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to start status timer");
return;
}
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message");
return;
}
dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
pStatus->dnodeId = htonl(dnodeGetDnodeId());
pStatus->version = htonl(tsVersion);
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
// fill cluster cfg parameters
pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes);
pStatus->clusterCfg.enableBalance = htonl(tsEnableBalance);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum);
pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold);
pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
pStatus->clusterCfg.maxtablesPerVnode = htonl(tsMaxTablePerVnode);
pStatus->clusterCfg.maxVgroupsPerDb = htonl(tsMaxVgroupsPerDb);
tstrncpy(pStatus->clusterCfg.arbitrator, tsArbitrator, TSDB_EP_LEN);
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
pStatus->clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
.pCont = pStatus,
.contLen = contLen,
.msgType = TSDB_MSG_TYPE_DM_STATUS
};
SRpcEpSet epSet;
dnodeGetEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, &rpcMsg);
}
void dnodeSendStatusMsgToMnode() {
if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
dInfo("force send status msg to mnode");
taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册