提交 0ca664d8 编写于 作者: S slguan

#1177

上级 abb7731a
......@@ -3,7 +3,7 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
......@@ -20,8 +20,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(taosd dcluster)
ELSEIF (TD_LITE)
TARGET_LINK_LIBRARIES(taosd dlite)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd")
......
......@@ -13,13 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEMGMT_H
#define TDENGINE_VNODEMGMT_H
#ifndef TDENGINE_DNODE_MGMT_H
#define TDENGINE_DNODE_MGMT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tsched.h"
typedef struct {
char id[20];
char sid;
......@@ -31,8 +33,31 @@ typedef struct {
int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj);
void mgmtProcessMsgFromDnodeSpec(SSchedMsg *sched);
extern void *dmQhandle;
extern char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
extern char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
extern char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type);
extern char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type);
extern int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen);
extern int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code);
extern void (*dnodeInitMgmtIp)();
extern int (*dnodeInitMgmtConn)();
char *taosBuildRspMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size);
char *taosBuildReqMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size);
char *taosBuildRspMsgToMnodeEdgeImp(SMgmtObj *pObj, char type);
char *taosBuildReqMsgToMnodeEdgeImp(SMgmtObj *pObj, char type);
int taosSendMsgToMnodeEdgeImp(SMgmtObj *pObj, char *msg, int msgLen);
int taosSendSimpleRspToMnodeEdgeImp(SMgmtObj *pObj, char rsptype, char code);
void dnodeInitMgmtIpEdgeImp();
void* dnodeProcessMsgFromMgmtEdgeImp(SSchedMsg *sched);
int dnodeInitMgmtConnEdgeImp();
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEMGMT_H
#endif
......@@ -13,29 +13,49 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_PLUGIN_H
#define TDENGINE_DNODE_PLUGIN_H
#ifndef TDENGINE_DNODE_MODULE_H
#define TDENGINE_DNODE_MODULE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include "tsched.h"
#include "mgmt.h"
#define tsetModuleStatus(mod) \
{ tsModuleStatus |= (1 << mod); }
#define tclearModuleStatus(mod) \
{ tsModuleStatus &= ~(1 << mod); }
char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type);
char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type);
int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen);
int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code);
enum _module {
TSDB_MOD_MGMT,
TSDB_MOD_HTTP,
TSDB_MOD_MONITOR,
TSDB_MOD_MAX
};
void (*dnodeInitMgmtIp)();
void (*dnodeProcessMsgFromMgmt)(SSchedMsg *sched);
int (*dnodeInitMgmtConn)();
typedef struct {
char *name;
int (*initFp)();
void (*cleanUpFp)();
int (*startFp)();
void (*stopFp)();
int num;
int curNum;
int equalVnodeNum;
} SModule;
extern uint32_t tsModuleStatus;
extern SModule tsModule[];
void dnodeAllocModules();
int32_t dnodeInitModules();
void dnodeCleanUpModules();
extern void (*dnodeStartModules)();
void dnodeStartModulesEdgeImp();
#ifdef __cplusplus
}
......
......@@ -13,26 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEMGMT_H
#define TDENGINE_VNODEMGMT_H
#ifndef TDENGINE_DNODE_SERVICE_H
#define TDENGINE_DNODE_SERVICE_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
char id[20];
char sid;
void *thandle;
int mgmtIndex;
char status; // 0:offline, 1:online
} SMgmtObj;
int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
void (*dnodeParseParameterK)();
void dnodeParseParameterKComImp();
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEMGMT_H
#endif
......@@ -13,59 +13,32 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODESYSTEM_H
#define TDENGINE_DNODESYSTEM_H
#ifndef TDENGINE_DNODE_SYSTEM_H
#define TDENGINE_DNODE_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#define tsetModuleStatus(mod) \
{ tsModuleStatus |= (1 << mod); }
#define tclearModuleStatus(mod) \
{ tsModuleStatus &= ~(1 << mod); }
enum _module { TSDB_MOD_MGMT, TSDB_MOD_HTTP, TSDB_MOD_MONITOR, TSDB_MOD_MAX };
typedef struct {
char *name;
int (*initFp)();
void (*cleanUpFp)();
int (*startFp)();
void (*stopFp)();
int num;
int curNum;
int equalVnodeNum;
} SModule;
extern uint32_t tsModuleStatus;
extern SModule tsModule[];
extern pthread_mutex_t dmutex;
extern bool tsDnodeStopping;
extern int (*dnodeInitStorage)();
extern void (*dnodeCleanupStorage)();
extern int (*dnodeCheckSystem)();
void dnodeCleanUpSystem();
int dnodeInitSystem();
int dnodeInitSystemSpec();
void dnodeStartModuleSpec();
void dnodeParseParameterK();
void dnodeProcessModuleStatus(uint32_t status);
void dnodeResetSystem();
void dnodeCleanUpSystem();
void dnodeCheckDbRunning(const char* dir);
void vnodeCleanUpSystem();
int vnodeInitSystem();
void dnodeInitMgmtIp();
void vnodeInitQHandle();
int mgmtInitSystem();
void mgmtCleanUpSystem();
int mgmtStartSystem();
void mgmtStopSystem();
int taosCreateTierDirectory();
void taosCleanupTier();
void vnodePrintSystemInfo();
int vnodeCfgDynamicOptions(char *msg);
int vnodeInitStore();
int vnodeInitPeer(int numOfThreads);
#ifdef __cplusplus
}
......
......@@ -18,12 +18,13 @@
#include "os.h"
#include "dnodeSystem.h"
#include "dnodeMgmt.h"
#include "taosmsg.h"
#include "trpc.h"
#include "tsched.h"
#include "tsystem.h"
#include "vnode.h"
#include "vnodeMgmt.h"
#include "vnodeSystem.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
......@@ -42,16 +43,93 @@ void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables);
void vnodeOpenVnode(int vnode);
void vnodeCleanUpOneVnode(int vnode);
int vnodeSaveCreateMsgIntoQueue(SVnodeObj *pVnode, char *pMsg, int msgLen);
char *taosBuildRspMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size);
char *taosBuildReqMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size);
char *taosBuildRspMsgToMnode(SMgmtObj *pObj, char type);
char *taosBuildReqMsgToMnode(SMgmtObj *pObj, char type);
int taosSendSimpleRspToMnode(SMgmtObj *pObj, char rsptype, char code);
int taosSendMsgToMnode(SMgmtObj *pObj, char *msg, int msgLen);
char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = NULL;
char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size) = NULL;
char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type) = NULL;
char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type) = NULL;
int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen) = NULL;
int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = NULL;
void (*dnodeInitMgmtIp)() = NULL;
int (*dnodeInitMgmtConn)() = NULL;
char *taosBuildRspMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
*pStart = type;
return pStart + 1;
}
char *taosBuildReqMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
*pStart = type;
return pStart + 1;
}
char *taosBuildRspMsgToMnodeEdgeImp(SMgmtObj *pObj, char type) {
return taosBuildRspMsgToMnodeWithSize(pObj, type, 256);
}
void dnodeProcessMsgFromMgmtImp(char *content, int msgLen, int msgType, SMgmtObj *pObj) {
char *taosBuildReqMsgToMnodeEdgeImp(SMgmtObj *pObj, char type) {
return taosBuildReqMsgToMnodeWithSize(pObj, type, 256);
}
int taosSendMsgToMnodeEdgeImp(SMgmtObj *pObj, char *msg, int msgLen) {
dTrace("msg:%s is sent to mnode", taosMsg[(uint8_t)(*(msg-1))]);
/*
* Lite version has no message header, so minus one
*/
SSchedMsg schedMsg;
schedMsg.fp = mgmtProcessMsgFromDnodeSpec;
schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL;
schedMsg.thandle = NULL;
taosScheduleTask(dmQhandle, &schedMsg);
return 0;
}
int taosSendSimpleRspToMnodeEdgeImp(SMgmtObj *pObj, char rsptype, char code) {
char *pStart = taosBuildRspMsgToMnode(0, rsptype);
if (pStart == NULL) {
return 0;
}
*pStart = code;
taosSendMsgToMnode(0, pStart, code);
return 0;
}
void* dnodeProcessMsgFromMgmtEdgeImp(SSchedMsg *sched) {
char msgType = *sched->msg;
char *content = sched->msg + 1;
dTrace("msg:%s is received from mgmt", taosMsg[(uint8_t)msgType]);
dnodeDistributeMsgFromMgmt(content, 0, msgType, 0);
free(sched->msg);
return NULL;
}
int dnodeInitMgmtConnEdgeImp() {
return 0;
}
void dnodeInitMgmtIpEdgeImp() {}
void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj) {
if (msgType == TSDB_MSG_TYPE_CREATE) {
vnodeProcessCreateMeterRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_VPEERS) {
......@@ -116,12 +194,12 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
goto _over;
}
if (pVnode->syncStatus == TSDB_VN_SYNC_STATUS_SYNCING) {
code = vnodeSaveCreateMsgIntoQueue(pVnode, pMsg, msgLen);
dTrace("vid:%d, create msg is saved into sync queue", vid);
} else {
// if (pVnode->syncStatus == TSDB_VN_SYNC_STATUS_SYNCING) {
// code = vnodeSaveCreateMsgIntoQueue(pVnode, pMsg, msgLen);
// dTrace("vid:%d, create msg is saved into sync queue", vid);
// } else {
code = vnodeProcessCreateMeterMsg(pMsg, msgLen);
}
// }
_over:
taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_CREATE_RSP, code);
......@@ -512,3 +590,4 @@ int vnodeSendMeterCfgMsg(int vnode, int sid) {
msgLen = pMsg - pStart;
return taosSendMsgToMnode(pObj, pStart, msgLen);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "tglobalcfg.h"
#include "dnodeModule.h"
#include "dnodeSystem.h"
#include "monitorSystem.h"
#include "httpSystem.h"
#include "mgmtSystem.h"
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
void (*dnodeStartModules)() = NULL;
void dnodeAllocModules() {
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers;
tsModule[TSDB_MOD_MGMT].curNum = 0;
tsModule[TSDB_MOD_MGMT].equalVnodeNum = tsMgmtEqualVnodeNum;
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0;
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0;
}
void dnodeCleanUpModules() {
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].stopFp) {
(*tsModule[mod].stopFp)();
}
if (tsModule[mod].num != 0 && tsModule[mod].cleanUpFp) {
(*tsModule[mod].cleanUpFp)();
}
}
if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
}
void dnodeProcessModuleStatus(uint32_t status) {
if (tsDnodeStopping) {
return;
}
int news = status;
int olds = tsModuleStatus;
for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
int newStatus = news & (1 << moduleType);
int oldStatus = olds & (1 << moduleType);
if (oldStatus > 0) {
if (newStatus == 0) {
if (tsModule[moduleType].stopFp) {
dPrint("module:%s is stopped on this node", tsModule[moduleType].name);
(*tsModule[moduleType].stopFp)();
}
}
} else if (oldStatus == 0) {
if (newStatus > 0) {
if (tsModule[moduleType].startFp) {
dPrint("module:%s is started on this node", tsModule[moduleType].name);
(*tsModule[moduleType].startFp)();
}
}
} else {
}
}
tsModuleStatus = status;
}
int32_t dnodeInitModules() {
for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].initFp) {
if ((*tsModule[mod].initFp)() != 0) {
dError("TDengine initialization failed");
return -1;
}
}
}
return 0;
}
void dnodeStartModulesEdgeImp() {
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].startFp) {
if ((*tsModule[mod].startFp)() != 0) {
dError("failed to start module:%d", mod);
}
}
}
if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodePlugin.h"
#include "dnodeMgmt.h"
#include "vnode.h"
#include "vnodeMgmt.h"
char *taosBuildRspMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
*pStart = type;
return pStart + 1;
}
char *taosBuildReqMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size);
if (pStart == NULL) {
return NULL;
}
*pStart = type;
return pStart + 1;
}
char *taosBuildRspMsgToMnodeEdgeImp(SMgmtObj *pObj, char type) {
return taosBuildRspMsgToMnodeWithSize(pObj, type, 256);
}
char *taosBuildReqMsgToMnodeEdgeImp(SMgmtObj *pObj, char type) {
return taosBuildReqMsgToMnodeWithSize(pObj, type, 256);
}
int taosSendMsgToMnodeEdgeImp(SMgmtObj *pObj, char *msg, int msgLen) {
dTrace("msg:%s is sent to mnode", taosMsg[(uint8_t)(*(msg-1))]);
/*
* Lite version has no message header, so minus one
*/
SSchedMsg schedMsg;
schedMsg.fp = mgmtProcessMsgFromDnodeSpec;
schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL;
schedMsg.thandle = NULL;
taosScheduleTask(dmQhandle, &schedMsg);
return 0;
}
int taosSendSimpleRspToMnodeEdgeImp(SMgmtObj *pObj, char rsptype, char code) {
char *pStart = taosBuildRspMsgToMnode(0, rsptype);
if (pStart == NULL) {
return 0;
}
*pStart = code;
taosSendMsgToMnode(0, pStart, code);
return 0;
}
void dnodeProcessMsgFromMgmtEdgeImp(SSchedMsg *sched) {
char msgType = *sched->msg;
char *content = sched->msg + 1;
dTrace("msg:%s is received from mgmt", taosMsg[(uint8_t)msgType]);
dnodeProcessMsgFromMgmtImp(content, 0, msgType, 0);
free(sched->msg);
}
int dnodeInitMgmtConnEdgeImp() {
return 0;
}
void dnodeInitMgmtIpEdgeImp() {}
void dnodeInitPlugin() {
dnodeInitMgmtConn = dnodeInitMgmtConnEdgeImp;
dnodeInitMgmtIp = dnodeInitMgmtIpEdgeImp;
dnodeProcessMsgFromMgmt = dnodeProcessMsgFromMgmtEdgeImp;
taosBuildRspMsgToMnodeWithSize = taosBuildRspMsgToMnodeWithSizeEdgeImp;
taosBuildReqMsgToMnodeWithSize = taosBuildReqMsgToMnodeWithSizeEdgeImp;
taosBuildRspMsgToMnode = taosBuildRspMsgToMnodeEdgeImp;
taosBuildReqMsgToMnode = taosBuildReqMsgToMnodeEdgeImp;
taosSendMsgToMnode = taosSendMsgToMnodeEdgeImp;
taosSendSimpleRspToMnode = taosSendSimpleRspToMnodeEdgeImp;
}
......@@ -22,6 +22,9 @@
#include "tsdb.h"
#include "vnode.h"
void (*dnodeParseParameterK)() = NULL;
void dnodeParseParameterKComImp() {}
/* Termination handler */
void signal_handler(int signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) {
......@@ -110,3 +113,5 @@ int main(int argc, char *argv[]) {
sleep(1000);
}
}
......@@ -15,83 +15,62 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mgmt.h"
#include "vnode.h"
#include "tsdb.h"
#include "tlog.h"
#include "ttimer.h"
#include "dnodeMgmt.h"
#include "dnodeModule.h"
#include "dnodeService.h"
#include "dnodeSystem.h"
#include "httpSystem.h"
#include "monitorSystem.h"
#include "tcrc32c.h"
#include "tglobalcfg.h"
#include "httpSystem.h"
#include "mgmtSystem.h"
#include "vnode.h"
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
pthread_mutex_t dmutex;
extern int vnodeSelectReqNum;
extern int vnodeInsertReqNum;
void * tsStatusTimer = NULL;
bool tsDnodeStopping = false;
int dnodeCheckConfig();
void dnodeCountRequest(SCountInfo *info);
void dnodeInitModules() {
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers;
tsModule[TSDB_MOD_MGMT].curNum = 0;
tsModule[TSDB_MOD_MGMT].equalVnodeNum = tsMgmtEqualVnodeNum;
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0;
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0;
}
// internal global, not configurable
void * vnodeTmrCtrl;
void ** rpcQhandle;
void * dmQhandle;
void * queryQhandle;
int tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
int tsMaxQueues;
uint32_t tsRebootTime;
int (*dnodeInitStorage)() = NULL;
void (*dnodeCleanupStorage)() = NULL;
int (*dnodeCheckSystem)() = NULL;
int32_t dnodeInitRpcQHandle();
int32_t dnodeInitQueryQHandle();
int32_t dnodeInitTmrCtl();
void dnodeInitPlugin();
void dnodeCountRequestImp(SCountInfo *info);
void dnodeCleanUpSystem() {
if (tsDnodeStopping) return;
tsDnodeStopping = true;
if (tsDnodeStopping) {
return;
} else {
tsDnodeStopping = true;
}
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].stopFp) {
(*tsModule[mod].stopFp)();
}
if (tsModule[mod].num != 0 && tsModule[mod].cleanUpFp) {
(*tsModule[mod].cleanUpFp)();
}
}
if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
dnodeCleanUpModules();
vnodeCleanUpVnodes();
taosCloseLogger();
taosCleanupTier();
dnodeCleanupStorage();
}
void dnodeCheckDbRunning(const char* dir) {
......@@ -109,6 +88,8 @@ int dnodeInitSystem() {
char temp[128];
struct stat dirstat;
dnodeInitPlugin();
taosResolveCRC();
tsRebootTime = taosGetTimestampSec();
......@@ -117,10 +98,14 @@ int dnodeInitSystem() {
// Read global configuration.
tsReadGlobalLogConfig();
if (stat(logDir, &dirstat) < 0) mkdir(logDir, 0755);
if (stat(logDir, &dirstat) < 0) {
mkdir(logDir, 0755);
}
sprintf(temp, "%s/taosdlog", logDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) printf("failed to init log file\n");
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!tsReadGlobalConfig()) { // TODO : Change this function
tsPrintGlobalConfig();
......@@ -128,7 +113,7 @@ int dnodeInitSystem() {
return -1;
}
if (taosCreateTierDirectory() != 0) {
if (dnodeInitStorage() != 0) {
dError("TDengine init tier directory failed");
return -1;
}
......@@ -136,93 +121,93 @@ int dnodeInitSystem() {
dnodeInitMgmtIp();
tsPrintGlobalConfig();
dPrint("Server IP address is:%s", tsPrivateIp);
taosSetCoreDump();
signal(SIGPIPE, SIG_IGN);
dnodeInitModules();
dnodeAllocModules();
pthread_mutex_init(&dmutex, NULL);
dPrint("starting to initialize TDengine ...");
vnodeInitQHandle();
if (dnodeInitSystemSpec() < 0) {
if (dnodeInitRpcQHandle() < 0) {
dError("failed to init query qhandle, exit");
return -1;
}
if (dnodeCheckSystem() < 0) {
return -1;
}
for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].initFp) {
if ((*tsModule[mod].initFp)() != 0) {
dError("TDengine initialization failed");
return -1;
}
}
if (dnodeInitModules() < 0) {
return -1;
}
if (vnodeInitSystem() != 0) {
dError("TDengine vnodes initialization failed");
if (dnodeInitTmrCtl() < 0) {
dError("failed to init timer, exit");
return -1;
}
monitorCountReqFp = dnodeCountRequest;
if (dnodeInitQueryQHandle() < 0) {
dError("failed to init query qhandle, exit");
return -1;
}
dnodeStartModuleSpec();
if (vnodeInitStore() < 0) {
dError("failed to init vnode storage");
return -1;
}
dPrint("TDengine is initialized successfully");
int numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (numOfThreads < 1) numOfThreads = 1;
if (vnodeInitPeer(numOfThreads) < 0) {
dError("failed to init vnode peer communication");
return -1;
}
return 0;
}
if (dnodeInitMgmtConn() < 0) {
dError("failed to init communication to mgmt");
return -1;
}
if (vnodeInitShell() < 0) {
dError("failed to init communication to shell");
return -1;
}
void dnodeProcessModuleStatus(uint32_t status) {
if (tsDnodeStopping) return;
int news = status;
int olds = tsModuleStatus;
for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
int newStatus = news & (1 << moduleType);
int oldStatus = olds & (1 << moduleType);
if (oldStatus > 0) {
if (newStatus == 0) {
if (tsModule[moduleType].stopFp) {
dPrint("module:%s is stopped on this node", tsModule[moduleType].name);
(*tsModule[moduleType].stopFp)();
}
}
} else if (oldStatus == 0) {
if (newStatus > 0) {
if (tsModule[moduleType].startFp) {
dPrint("module:%s is started on this node", tsModule[moduleType].name);
(*tsModule[moduleType].startFp)();
}
}
} else {
}
if (vnodeInitVnodes() < 0) {
dError("failed to init store");
return -1;
}
tsModuleStatus = status;
mnodeCountRequestFp = dnodeCountRequestImp;
dnodeStartModules();
dPrint("TDengine is initialized successfully");
return 0;
}
void dnodeResetSystem() {
dPrint("reset the system ...");
for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) vnodeRemoveVnode(vnode);
for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) {
vnodeRemoveVnode(vnode);
}
mgmtStopSystem();
}
void dnodeCountRequest(SCountInfo *info) {
void dnodeCountRequestImp(SCountInfo *info) {
httpGetReqCount(&info->httpReqNum);
info->selectReqNum = atomic_exchange_32(&vnodeSelectReqNum, 0);
info->insertReqNum = atomic_exchange_32(&vnodeInsertReqNum, 0);
}
//spec
extern SModule tsModule[TSDB_MOD_MAX];
int taosCreateTierDirectory() {
int dnodeInitStorageComImp() {
struct stat dirstat;
strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) {
......@@ -244,16 +229,67 @@ int taosCreateTierDirectory() {
return 0;
}
int dnodeInitSystemSpec() { return 0; }
void dnodeCleanupStorageComImp() {}
void dnodeStartModuleSpec() {
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].startFp) {
if ((*tsModule[mod].startFp)() != 0) {
dError("failed to start module:%d", mod);
}
}
int32_t dnodeInitQueryQHandle() {
int numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore;
if (numOfThreads < 1) {
numOfThreads = 1;
}
int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode;
dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize,numOfThreads);
queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl);
return 0;
}
void dnodeParseParameterK() {}
int32_t dnodeInitTmrCtl() {
vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode");
if (vnodeTmrCtrl == NULL) {
dError("failed to init timer, exit");
return -1;
}
return 0;
}
int32_t dnodeInitRpcQHandle() {
tsMaxQueues = (1.0 - tsRatioOfQueryThreads)*tsNumOfCores*tsNumOfThreadsPerCore / 2.0;
if (tsMaxQueues < 1) tsMaxQueues = 1;
rpcQhandle = malloc(tsMaxQueues*sizeof(void *));
for (int i=0; i< tsMaxQueues; ++i )
rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
return 0;
}
int dnodeCheckSystemComImp() {
return 0;
}
void dnodeInitPlugin() {
dnodeInitMgmtConn = dnodeInitMgmtConnEdgeImp;
dnodeInitMgmtIp = dnodeInitMgmtIpEdgeImp;
taosBuildRspMsgToMnodeWithSize = taosBuildRspMsgToMnodeWithSizeEdgeImp;
taosBuildReqMsgToMnodeWithSize = taosBuildReqMsgToMnodeWithSizeEdgeImp;
taosBuildRspMsgToMnode = taosBuildRspMsgToMnodeEdgeImp;
taosBuildReqMsgToMnode = taosBuildReqMsgToMnodeEdgeImp;
taosSendMsgToMnode = taosSendMsgToMnodeEdgeImp;
taosSendSimpleRspToMnode = taosSendSimpleRspToMnodeEdgeImp;
dnodeParseParameterK = dnodeParseParameterKComImp;
dnodeCheckSystem = dnodeCheckSystemComImp;
dnodeInitStorage = dnodeInitStorageComImp;
dnodeCleanupStorage = dnodeCleanupStorageComImp;
dnodeStartModules = dnodeStartModulesEdgeImp;
}
......@@ -14,100 +14,4 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tsdb.h"
#include "tsocket.h"
#include "vnode.h"
#include "vnodeSystem.h"
// internal global, not configurable
void * vnodeTmrCtrl;
void ** rpcQhandle;
void * dmQhandle;
void * queryQhandle;
int tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
int tsMaxQueues;
uint32_t tsRebootTime;
void vnodeCleanUpSystem() {
vnodeCleanUpVnodes();
}
bool vnodeInitQueryHandle() {
int numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore;
if (numOfThreads < 1) {
numOfThreads = 1;
}
int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode;
dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize,numOfThreads);
queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl);
return true;
}
bool vnodeInitTmrCtl() {
vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode");
if (vnodeTmrCtrl == NULL) {
dError("failed to init timer, exit");
return false;
}
return true;
}
int vnodeInitSystem() {
if (!vnodeInitTmrCtl()) {
dError("failed to init timer, exit");
return -1;
}
if (!vnodeInitQueryHandle()) {
dError("failed to init query qhandle, exit");
return -1;
}
if (vnodeInitStore() < 0) {
dError("failed to init vnode storage");
return -1;
}
int numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (numOfThreads < 1) numOfThreads = 1;
if (vnodeInitPeer(numOfThreads) < 0) {
dError("failed to init vnode peer communication");
return -1;
}
if (dnodeInitMgmtConn() < 0) {
dError("failed to init communication to mgmt");
return -1;
}
if (vnodeInitShell() < 0) {
dError("failed to init communication to shell");
return -1;
}
if (vnodeInitVnodes() < 0) {
dError("failed to init store");
return -1;
}
dPrint("vnode is initialized successfully");
return 0;
}
void vnodeInitQHandle() {
tsMaxQueues = (1.0 - tsRatioOfQueryThreads)*tsNumOfCores*tsNumOfThreadsPerCore / 2.0;
if (tsMaxQueues < 1) tsMaxQueues = 1;
rpcQhandle = malloc(tsMaxQueues*sizeof(void *));
for (int i=0; i< tsMaxQueues; ++i )
rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
}
......@@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
......
......@@ -40,6 +40,7 @@ void mgmtDoStatistic(void *handle, void *tmrId);
void mgmtStartMgmtTimer();
int mgmtStartSystem();
void mgmtStopSystem();
void mgmtCleanUpSystem();
......
......@@ -21,6 +21,7 @@
#include "mgmt.h"
#include "tschemautil.h"
#include "vnodeStatus.h"
#include "dnodeModule.h"
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
int mgmtGetDnodesNum();
......
......@@ -20,6 +20,7 @@
#include "mgmt.h"
#include "tsdb.h"
#include "mgmtSystem.h"
#include "dnodeModule.h"
// global, not configurable
char mgmtDirectory[128];
......
......@@ -3,7 +3,7 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mgmtBalance.h"
#include "vnodeStatus.h"
#include "dnodeModule.h"
void mgmtStartBalanceTimer(int64_t mseconds) {}
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "mgmt.h"
#include "vnodeStatus.h"
#include "dnodeModule.h"
SDnodeObj dnodeObj;
extern uint32_t tsRebootTime;
......
......@@ -28,7 +28,7 @@
extern void *dmQhandle;
void * mgmtStatusTimer = NULL;
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj);
void dnodeProcessMsgFromMgmt(SSchedMsg *sched);
void* dnodeProcessMsgFromMgmtEdgeImp(SSchedMsg *sched);
char *taosBuildRspMsgToDnodeWithSize(SDnodeObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size);
......@@ -67,7 +67,7 @@ int taosSendMsgToDnode(SDnodeObj *pObj, char *msg, int msgLen) {
* Lite version has no message header, so minus one
*/
SSchedMsg schedMsg;
schedMsg.fp = dnodeProcessMsgFromMgmt;
schedMsg.fp = dnodeProcessMsgFromMgmtEdgeImp;
schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL;
schedMsg.thandle = NULL;
......
......@@ -29,6 +29,6 @@ typedef struct {
int httpReqNum;
} SCountInfo;
extern void (*monitorCountReqFp)(SCountInfo *info);
extern void (*mnodeCountRequestFp)(SCountInfo *info);
#endif
\ No newline at end of file
......@@ -77,7 +77,7 @@ void monitorSaveAcctLog(char *acctId, int64_t currentPointsPerSecond, int64_t ma
int64_t totalOutbound, int64_t maxOutbound, int64_t totalDbs, int64_t maxDbs,
int64_t totalUsers, int64_t maxUsers, int64_t totalStreams, int64_t maxStreams,
int64_t totalConns, int64_t maxConns, int8_t accessState);
void (*monitorCountReqFp)(SCountInfo *info) = NULL;
void (*mnodeCountRequestFp)(SCountInfo *info) = NULL;
void monitorExecuteSQL(char *sql);
void monitorCheckDiskUsage(void *para, void *unused) {
......@@ -335,7 +335,7 @@ int monitorBuildBandSql(char *sql) {
int monitorBuildReqSql(char *sql) {
SCountInfo info;
info.httpReqNum = info.insertReqNum = info.selectReqNum = 0;
(*monitorCountReqFp)(&info);
(*mnodeCountRequestFp)(&info);
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.selectReqNum, info.insertReqNum);
}
......@@ -355,7 +355,7 @@ void monitorSaveSystemInfo() {
return;
}
if (monitorCountReqFp == NULL) {
if (mnodeCountRequestFp == NULL) {
return;
}
......
......@@ -564,10 +564,6 @@ char *taosIpStr(uint32_t ipInt) {
return ipStr;
}
#ifndef CLUSTER
void taosCleanupTier() {}
#endif
FORCE_INLINE float taos_align_get_float(const char* pBuf) {
float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)pBuf;
......
......@@ -3,7 +3,7 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
......
......@@ -304,7 +304,7 @@ typedef struct {
extern int tsMeterSizeOnFile;
extern uint32_t tsRebootTime;
extern void ** rpcQhandle;
extern void * dmQhandle;
extern void * queryQhandle;
extern int tsVnodePeers;
extern int tsMaxVnode;
......@@ -524,7 +524,6 @@ SConnSec *vnodeGetMeterSec(int vnode, int sid);
int vnodeCreateMeterObjFile(int vnode);
// mgmt
int dnodeInitMgmtConn();
void vnodeCleanUpMgmt();
......
......@@ -20,19 +20,7 @@
extern "C" {
#endif
void vnodeCleanUpSystem();
void vnodePrintSystemInfo();
int vnodeInitSystem();
int vnodeCfgDynamicOptions(char *msg);
int vnodeInitStore();
int vnodeInitPeer(int numOfThreads);
int dnodeInitMgmtConn();
#ifdef __cplusplus
}
......
......@@ -21,7 +21,6 @@
#include "ttime.h"
#include "tutil.h"
#include "vnode.h"
#include "vnodeMgmt.h"
#include "vnodeShell.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
......
......@@ -3,7 +3,7 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册