提交 dfafa1be 编写于 作者: S slguan

reorganize the file mgmtSystem.c

上级 23693d9c
...@@ -85,6 +85,8 @@ void dnodeSetRunStatus(SDnodeRunStatus status) { ...@@ -85,6 +85,8 @@ void dnodeSetRunStatus(SDnodeRunStatus status) {
} }
void dnodeCleanUpSystem() { void dnodeCleanUpSystem() {
tclearModuleStatus(TSDB_MOD_MGMT);
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_STOPPED) { if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_STOPPED) {
return; return;
} else { } else {
...@@ -131,29 +133,27 @@ void dnodeInitPlugins() { ...@@ -131,29 +133,27 @@ void dnodeInitPlugins() {
} }
int32_t dnodeInitSystem() { int32_t dnodeInitSystem() {
char temp[128]; tsRebootTime = taosGetTimestampSec();
struct stat dirstat; tscEmbedded = 1;
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
taosResolveCRC(); taosResolveCRC();
tsRebootTime = taosGetTimestampSec();
tscEmbedded = 1;
// Read global configuration. // Read global configuration.
tsReadGlobalLogConfig(); tsReadGlobalLogConfig();
struct stat dirstat;
if (stat(logDir, &dirstat) < 0) { if (stat(logDir, &dirstat) < 0) {
mkdir(logDir, 0755); mkdir(logDir, 0755);
} }
char temp[128];
sprintf(temp, "%s/taosdlog", logDir); sprintf(temp, "%s/taosdlog", logDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n"); printf("failed to init log file\n");
} }
if (!tsReadGlobalConfig()) { // TODO : Change this function if (!tsReadGlobalConfig()) {
tsPrintGlobalConfig(); tsPrintGlobalConfig();
dError("TDengine read global config failed"); dError("TDengine read global config failed");
return -1; return -1;
...@@ -249,7 +249,7 @@ int32_t dnodeInitStorageImp() { ...@@ -249,7 +249,7 @@ int32_t dnodeInitStorageImp() {
sprintf(fileName, "%s/data", tsDirectory); sprintf(fileName, "%s/data", tsDirectory);
mkdir(fileName, 0755); mkdir(fileName, 0755);
sprintf(mgmtDirectory, "%s/mgmt", tsDirectory); sprintf(tsMgmtDirectory, "%s/mgmt", tsDirectory);
sprintf(tsDirectory, "%s/tsdb", dataDir); sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDataDirOpenned(dataDir); dnodeCheckDataDirOpenned(dataDir);
......
...@@ -43,12 +43,9 @@ extern "C" { ...@@ -43,12 +43,9 @@ extern "C" {
// internal globals // internal globals
extern char version[]; extern char version[];
extern void *mgmtTmr; extern void *tsMgmtTmr;
extern void *mgmtQhandle; extern void *tsMgmtTranQhandle;
extern void *mgmtTranQhandle; extern char tsMgmtDirectory[];
extern int mgmtShellConns;
extern int mgmtDnodeConns;
extern char mgmtDirectory[];
extern int tsAcctUpdateSize; extern int tsAcctUpdateSize;
extern int tsDbUpdateSize; extern int tsDbUpdateSize;
......
...@@ -28,7 +28,6 @@ void mgmtCleanUpSystem(); ...@@ -28,7 +28,6 @@ void mgmtCleanUpSystem();
extern int32_t (*mgmtInitSystem)(); extern int32_t (*mgmtInitSystem)();
extern int32_t (*mgmtCheckMgmtRunning)(); extern int32_t (*mgmtCheckMgmtRunning)();
extern void (*mgmtDoStatistic)(void *handle, void *tmrId); extern void (*mgmtDoStatistic)(void *handle, void *tmrId);
extern void (*mgmtStartMgmtTimer)();
extern void (*mgmtStopSystem)(); extern void (*mgmtStopSystem)();
extern void (*mgmtCleanUpRedirect)(); extern void (*mgmtCleanUpRedirect)();
......
...@@ -83,7 +83,7 @@ int32_t mgmtInitDbs() { ...@@ -83,7 +83,7 @@ int32_t mgmtInitDbs() {
mgmtDbActionInit(); mgmtDbActionInit();
dbSdb = sdbOpenTable(tsMaxDbs, sizeof(SDbObj), "db", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtDbAction); dbSdb = sdbOpenTable(tsMaxDbs, sizeof(SDbObj), "db", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtDbAction);
if (dbSdb == NULL) { if (dbSdb == NULL) {
mError("failed to init db data"); mError("failed to init db data");
return -1; return -1;
......
...@@ -556,7 +556,7 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { ...@@ -556,7 +556,7 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) {
pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
pVload->status = TSDB_VN_STATUS_OFFLINE; pVload->status = TSDB_VN_STATUS_OFFLINE;
mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode); mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, mgmtTmr); taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr);
} }
} }
...@@ -590,7 +590,7 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { ...@@ -590,7 +590,7 @@ void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) {
} }
} }
taosTmrReset(mgmtProcessDnodeStatus, tsStatusInterval * 1000, NULL, mgmtTmr, &mgmtStatusTimer); taosTmrReset(mgmtProcessDnodeStatus, tsStatusInterval * 1000, NULL, tsMgmtTmr, &mgmtStatusTimer);
if (mgmtStatusTimer == NULL) { if (mgmtStatusTimer == NULL) {
mError("Failed to start status timer"); mError("Failed to start status timer");
} }
......
...@@ -46,7 +46,7 @@ static RetrieveMetaFp* mgmtRetrieveFp; ...@@ -46,7 +46,7 @@ static RetrieveMetaFp* mgmtRetrieveFp;
static void mgmtInitShowMsgFp(); static void mgmtInitShowMsgFp();
void * pShellConn = NULL; void * tsShellConn = NULL;
SConnObj *connList; SConnObj *connList;
void * mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle); void * mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle);
int (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int, SConnObj *); int (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int, SConnObj *);
...@@ -95,8 +95,8 @@ int mgmtInitShell() { ...@@ -95,8 +95,8 @@ int mgmtInitShell() {
rpcInit.idleTime = tsShellActivityTimer * 2000; rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.afp = mgmtRetriveUserAuthInfo; rpcInit.afp = mgmtRetriveUserAuthInfo;
pShellConn = rpcOpen(&rpcInit); tsShellConn = rpcOpen(&rpcInit);
if (pShellConn == NULL) { if (tsShellConn == NULL) {
mError("failed to init tcp connection to shell"); mError("failed to init tcp connection to shell");
return -1; return -1;
} }
...@@ -105,9 +105,9 @@ int mgmtInitShell() { ...@@ -105,9 +105,9 @@ int mgmtInitShell() {
} }
void mgmtCleanUpShell() { void mgmtCleanUpShell() {
if (pShellConn) { if (tsShellConn) {
rpcClose(pShellConn); rpcClose(tsShellConn);
pShellConn = NULL; tsShellConn = NULL;
} }
tfree(connList); tfree(connList);
} }
...@@ -1489,7 +1489,7 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -1489,7 +1489,7 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
// schedMsg.tfp = NULL; // schedMsg.tfp = NULL;
// schedMsg.thandle = pConn; // schedMsg.thandle = pConn;
// //
// taosScheduleTask(mgmtTranQhandle, &schedMsg); // taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
// } else { // } else {
// mError("%s from shell is not processed", taosMsg[pMsg->msgType]); // mError("%s from shell is not processed", taosMsg[pMsg->msgType]);
// } // }
......
...@@ -175,7 +175,7 @@ int32_t mgmtInitSuperTables() { ...@@ -175,7 +175,7 @@ int32_t mgmtInitSuperTables() {
mgmtSuperTableActionInit(); mgmtSuperTableActionInit();
tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN, tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"meters", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtSuperTableAction); "meters", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction);
if (tsSuperTableSdb == NULL) { if (tsSuperTableSdb == NULL) {
mError("failed to init meter data"); mError("failed to init meter data");
return -1; return -1;
......
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosdef.h"
#include "dnodeSystem.h" #include "tsched.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "mgmtBalance.h" #include "mgmtBalance.h"
...@@ -29,27 +29,17 @@ ...@@ -29,27 +29,17 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "taosdef.h"
// global, not configurable char tsMgmtDirectory[128] = {0};
char mgmtDirectory[128]; void *tsMgmtTmr = NULL;
void * mgmtTmr; void *tsMgmtTranQhandle = NULL;
void * mgmtQhandle = NULL; void *tsMgmtStatisTimer = NULL;
void * mgmtTranQhandle = NULL;
void * mgmtStatisticTimer = NULL;
int mgmtShellConns = 0;
int mgmtDnodeConns = 0;
extern void * pShellConn;
extern void ** tsRpcQhandle;
extern SMgmtIpList mgmtIpList;
extern SMgmtIpList mgmtPublicIpList;
extern char mgmtIpStr[TSDB_MAX_MGMT_IPS][20];
extern void * acctSdb;
void mgmtCleanUpSystem() { void mgmtCleanUpSystem() {
if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) { mPrint("starting to clean up mgmt");
mTrace("mgmt is running, clean it up");
taosTmrStopA(&mgmtStatisticTimer); taosTmrStopA(&tsMgmtStatisTimer);
mgmtCleanUpRedirect();
sdbCleanUpPeers(); sdbCleanUpPeers();
mgmtCleanupBalance(); mgmtCleanupBalance();
mgmtCleanUpDnodeInt(); mgmtCleanUpDnodeInt();
...@@ -60,29 +50,18 @@ void mgmtCleanUpSystem() { ...@@ -60,29 +50,18 @@ void mgmtCleanUpSystem() {
mgmtCleanUpDnodes(); mgmtCleanUpDnodes();
mgmtCleanUpUsers(); mgmtCleanUpUsers();
mgmtCleanUpAccts(); mgmtCleanUpAccts();
taosTmrCleanUp(mgmtTmr); taosTmrCleanUp(tsMgmtTmr);
taosCleanUpScheduler(mgmtQhandle); taosCleanUpScheduler(tsMgmtTranQhandle);
taosCleanUpScheduler(mgmtTranQhandle);
} else {
mgmtCleanUpRedirect();
}
mgmtTmr = NULL;
mgmtQhandle = NULL;
mgmtShellConns = 0;
mgmtDnodeConns = 0;
tclearModuleStatus(TSDB_MOD_MGMT);
pShellConn = NULL;
mTrace("mgmt is cleaned up"); mPrint("mgmt is cleaned up");
} }
int mgmtStartSystem() { int32_t mgmtStartSystem() {
mPrint("starting to initialize TDengine mgmt ..."); mPrint("starting to initialize TDengine mgmt ...");
struct stat dirstat; struct stat dirstat;
if (stat(mgmtDirectory, &dirstat) < 0) { if (stat(tsMgmtDirectory, &dirstat) < 0) {
mkdir(mgmtDirectory, 0755); mkdir(tsMgmtDirectory, 0755);
} }
if (mgmtCheckMgmtRunning() != 0) { if (mgmtCheckMgmtRunning() != 0) {
...@@ -90,14 +69,10 @@ int mgmtStartSystem() { ...@@ -90,14 +69,10 @@ int mgmtStartSystem() {
return 0; return 0;
} }
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 2.0; tsMgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT");
if (numOfThreads < 1) numOfThreads = 1;
mgmtQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, numOfThreads, "mnode");
mgmtTranQhandle = taosInitScheduler(tsMaxDnodes + tsMaxShellConns, 1, "mnodeT"); tsMgmtTmr = taosTmrInit((tsMaxDnodes + tsMaxShellConns) * 3, 200, 3600000, "MND");
if (tsMgmtTmr == NULL) {
mgmtTmr = taosTmrInit((tsMaxDnodes + tsMaxShellConns) * 3, 200, 3600000, "MND");
if (mgmtTmr == NULL) {
mError("failed to init timer, exit"); mError("failed to init timer, exit");
return -1; return -1;
} }
...@@ -142,7 +117,7 @@ int mgmtStartSystem() { ...@@ -142,7 +117,7 @@ int mgmtStartSystem() {
return -1; return -1;
} }
if (sdbInitPeers(mgmtDirectory) < 0) { if (sdbInitPeers(tsMgmtDirectory) < 0) {
mError("failed to init peers"); mError("failed to init peers");
return -1; return -1;
} }
...@@ -153,9 +128,7 @@ int mgmtStartSystem() { ...@@ -153,9 +128,7 @@ int mgmtStartSystem() {
mgmtCheckAcct(); mgmtCheckAcct();
taosTmrReset(mgmtDoStatistic, tsStatusInterval * 30000, NULL, mgmtTmr, &mgmtStatisticTimer); taosTmrReset(mgmtDoStatistic, tsStatusInterval * 30000, NULL, tsMgmtTmr, &tsMgmtStatisTimer);
mgmtStartMgmtTimer();
mPrint("TDengine mgmt is initialized successfully"); mPrint("TDengine mgmt is initialized successfully");
...@@ -163,26 +136,31 @@ int mgmtStartSystem() { ...@@ -163,26 +136,31 @@ int mgmtStartSystem() {
} }
int32_t mgmtInitSystemImp() { int32_t mgmtInitSystemImp() {
return mgmtStartSystem(); int32_t code = mgmtStartSystem();
if (code != 0) {
return code;
}
taosTmrReset(mgmtProcessDnodeStatus, 500, NULL, tsMgmtTmr, &mgmtStatusTimer);
return code;
} }
int32_t (*mgmtInitSystem)() = mgmtInitSystemImp; int32_t (*mgmtInitSystem)() = mgmtInitSystemImp;
int32_t mgmtCheckMgmtRunningImp() { return 0; } int32_t mgmtCheckMgmtRunningImp() {
return 0;
}
int32_t (*mgmtCheckMgmtRunning)() = mgmtCheckMgmtRunningImp; int32_t (*mgmtCheckMgmtRunning)() = mgmtCheckMgmtRunningImp;
void mgmtDoStatisticImp(void *handle, void *tmrId) {} void mgmtDoStatisticImp(void *handle, void *tmrId) {}
void (*mgmtDoStatistic)(void *handle, void *tmrId) = mgmtDoStatisticImp;
void mgmtStartMgmtTimerImp() { void (*mgmtDoStatistic)(void *handle, void *tmrId) = mgmtDoStatisticImp;
taosTmrReset(mgmtProcessDnodeStatus, 500, NULL, mgmtTmr, &mgmtStatusTimer);
}
void (*mgmtStartMgmtTimer)() = mgmtStartMgmtTimerImp;
void mgmtStopSystemImp() {} void mgmtStopSystemImp() {}
void (*mgmtStopSystem)() = mgmtStopSystemImp; void (*mgmtStopSystem)() = mgmtStopSystemImp;
void mgmtCleanUpRedirectImp() {} void mgmtCleanUpRedirectImp() {}
void (*mgmtCleanUpRedirect)() = mgmtCleanUpRedirectImp;
void (*mgmtCleanUpRedirect)() = mgmtCleanUpRedirectImp;
...@@ -68,7 +68,7 @@ int mgmtInitUsers() { ...@@ -68,7 +68,7 @@ int mgmtInitUsers() {
mgmtUserActionInit(); mgmtUserActionInit();
userSdb = sdbOpenTable(tsMaxUsers, sizeof(SUserObj), "user", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtUserAction); userSdb = sdbOpenTable(tsMaxUsers, sizeof(SUserObj), "user", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtUserAction);
if (userSdb == NULL) { if (userSdb == NULL) {
mError("failed to init user data"); mError("failed to init user data");
return -1; return -1;
......
...@@ -76,7 +76,7 @@ int mgmtInitVgroups() { ...@@ -76,7 +76,7 @@ int mgmtInitVgroups() {
SVgObj tObj; SVgObj tObj;
tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
vgSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, mgmtDirectory, mgmtVgroupAction); vgSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction);
if (vgSdb == NULL) { if (vgSdb == NULL) {
mError("failed to init vgroup data"); mError("failed to init vgroup data");
return -1; return -1;
...@@ -205,7 +205,7 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { ...@@ -205,7 +205,7 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
mError("db:%s, no enough free dnode to alloc %d vnodes", pDb->name, pVgroup->numOfVnodes); mError("db:%s, no enough free dnode to alloc %d vnodes", pDb->name, pVgroup->numOfVnodes);
free(pVgroup); free(pVgroup);
pDb->vgStatus = TSDB_VG_STATUS_FULL; pDb->vgStatus = TSDB_VG_STATUS_FULL;
taosTmrReset(mgmtProcessVgTimer, 5000, pDb, mgmtTmr, &pDb->vgTimer); taosTmrReset(mgmtProcessVgTimer, 5000, pDb, tsMgmtTmr, &pDb->vgTimer);
return NULL; return NULL;
} }
......
...@@ -262,7 +262,7 @@ SGlobalConfig *tsGetConfigOption(const char *option); ...@@ -262,7 +262,7 @@ SGlobalConfig *tsGetConfigOption(const char *option);
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
extern char mgmtDirectory[]; extern char tsMgmtDirectory[];
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -564,7 +564,7 @@ extern int (*pDecompFunc[])(const char *const input, int compressedSize, const i ...@@ -564,7 +564,7 @@ extern int (*pDecompFunc[])(const char *const input, int compressedSize, const i
// global variable and APIs provided by mgmt // global variable and APIs provided by mgmt
extern char mgmtStatus; extern char mgmtStatus;
extern char mgmtDirectory[]; extern char tsMgmtDirectory[];
extern const int16_t vnodeFileVersion; extern const int16_t vnodeFileVersion;
#ifdef __cplusplus #ifdef __cplusplus
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册