提交 3006a756 编写于 作者: S slguan

refactor dnode module code

上级 46c31a24
......@@ -20,9 +20,9 @@
extern "C" {
#endif
void dnodeAllocModules();
int32_t dnodeInitModules();
void dnodeCleanUpModules();
void dnodeStartModules();
#ifdef __cplusplus
}
......
......@@ -22,7 +22,7 @@ extern "C" {
int32_t dnodeInitRead();
void dnodeCleanupRead();
void dnodeRead(void *pMsg);
void dnodeRead(SRpcMsg *pMsg);
void * dnodeAllocateReadWorker();
void dnodeFreeReadWorker(void *rqueue);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_SYSTEM_H
#define TDENGINE_DNODE_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
TSDB_DNODE_RUN_STATUS_INITIALIZE,
TSDB_DNODE_RUN_STATUS_RUNING,
TSDB_DNODE_RUN_STATUS_STOPPED
} SDnodeRunStatus;
int32_t dnodeInitSystem();
void dnodeCleanUpSystem();
SDnodeRunStatus dnodeGetRunStatus();
#ifdef __cplusplus
}
#endif
#endif
......@@ -22,7 +22,7 @@ extern "C" {
int32_t dnodeInitWrite();
void dnodeCleanupWrite();
void dnodeWrite(void *pMsg);
void dnodeWrite(SRpcMsg *pMsg);
void * dnodeAllocateWriteWorker();
void dnodeFreeWriteWorker(void *worker);
......
......@@ -18,7 +18,7 @@
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "dnodeSystem.h"
#include "dnode.h"
static void (*dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
......
......@@ -15,41 +15,46 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "tglobalcfg.h"
#include "dnodeSystem.h"
#include "tlog.h"
#include "tmodule.h"
#include "trpc.h"
#include "tutil.h"
#include "dnode.h"
#include "dnodeMClient.h"
#include "dnodeMgmt.h"
#include "dnodeMnode.h"
#include "dnodeModule.h"
#include "dnodeRead.h"
#include "dnodeShell.h"
#include "dnodeWrite.h"
#ifdef CLUSTER
#include "account.h"
#include "admin.h"
#include "balance.h"
#include "cluster.h"
#include "grant.h"
#include "mpeer.h"
#include "storage.h"
#include "vpeer.h"
#endif
static int32_t dnodeInitSystem();
static int32_t dnodeInitStorage();
static void dnodeInitPlugins();
static void dnodeCleanupStorage();
static void dnodeCleanUpSystem();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
void (*dnodeParseParameterKFp)() = NULL;
/*
* Termination handler
*/
void signal_handler(int signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) {
tsCfgDynamicOptions("debugFlag 135");
return;
}
if (signum == SIGUSR2) {
tsCfgDynamicOptions("resetlog");
return;
}
syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service...");
// clean the system.
dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
dnodeCleanUpSystem();
// close the syslog
syslog(LOG_INFO, "Shut down TDengine service successfully");
dPrint("TDengine is shut down!");
closelog();
exit(EXIT_SUCCESS);
}
int main(int argc, char *argv[]) {
int32_t main(int32_t argc, char *argv[]) {
dnodeInitPlugins();
// Set global configuration file
for (int i = 1; i < argc; ++i) {
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
strcpy(configDir, argv[++i]);
......@@ -63,10 +68,11 @@ int main(int argc, char *argv[]) {
printf("gitinfo: %s\n", gitinfo);
printf("gitinfoI: %s\n", gitinfoOfInternal);
printf("buildinfo: %s\n", buildinfo);
return 0;
exit(EXIT_SUCCESS);
} else if (strcmp(argv[i], "-k") == 0) {
if (dnodeParseParameterKFp) {
dnodeParseParameterKFp();
exit(EXIT_SUCCESS);
}
#ifdef TAOS_MEM_CHECK
} else if (strcmp(argv[i], "--alloc-random-fail") == 0) {
......@@ -94,7 +100,6 @@ int main(int argc, char *argv[]) {
sigaction(SIGINT, &act, NULL);
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
// sigaction(SIGABRT, &act, NULL);
// Open /var/log/syslog file to record information.
openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1);
......@@ -116,4 +121,140 @@ int main(int argc, char *argv[]) {
}
}
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) {
tsCfgDynamicOptions("debugFlag 135");
return;
}
if (signum == SIGUSR2) {
tsCfgDynamicOptions("resetlog");
return;
}
syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service...");
// clean the system.
dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
dnodeCleanUpSystem();
// close the syslog
syslog(LOG_INFO, "Shut down TDengine service successfully");
dPrint("TDengine is shut down!");
closelog();
exit(EXIT_SUCCESS);
}
static int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
tscEmbedded = 1;
taosResolveCRC();
tsReadGlobalLogConfig();
taosSetCoreDump();
signal(SIGPIPE, SIG_IGN);
struct stat dirstat;
if (stat(logDir, &dirstat) < 0) {
mkdir(logDir, 0755);
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", logDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!tsReadGlobalConfig()) {
tsPrintGlobalConfig();
dError("TDengine read global config failed");
return -1;
}
tsPrintGlobalConfig();
dPrint("Server IP address is:%s", tsPrivateIp);
dPrint("starting to initialize TDengine ...");
if (dnodeInitStorage() != 0) return -1;
if (dnodeInitModules() != 0) return -1;
if (dnodeInitRead() != 0) return -1;
if (dnodeInitWrite() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1;
if (dnodeInitMnode() != 0) return -1;
if (dnodeInitMClient() != 0) return -1;
if (dnodeInitShell() != 0) return -1;
dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
dPrint("TDengine is initialized successfully");
return 0;
}
static void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
tclearModuleStatus(TSDB_MOD_MGMT);
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupShell();
dnodeCleanupMClient();
dnodeCleanupMnode();
dnodeCleanupMgmt();
dnodeCleanupWrite();
dnodeCleanupRead();
dnodeCleanUpModules();
dnodeCleanupStorage();
taosCloseLogger();
}
}
SDnodeRunStatus dnodeGetRunStatus() {
return tsDnodeRunStatus;
}
static void dnodeSetRunStatus(SDnodeRunStatus status) {
tsDnodeRunStatus = status;
}
static void dnodeCheckDataDirOpenned(char *dir) {
char filepath[256] = {0};
sprintf(filepath, "%s/.running", dir);
int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret != 0) {
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
exit(0);
}
}
static int32_t dnodeInitStorage() {
struct stat dirstat;
strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) {
mkdir(dataDir, 0755);
}
char fileName[128];
sprintf(fileName, "%s/tsdb", tsDirectory);
mkdir(fileName, 0755);
sprintf(fileName, "%s/data", tsDirectory);
mkdir(fileName, 0755);
sprintf(tsMgmtDirectory, "%s/mgmt", tsDirectory);
sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDataDirOpenned(dataDir);
dPrint("storage directory is initialized");
return 0;
}
static void dnodeCleanupStorage() {}
static void dnodeInitPlugins() {
#ifdef CLUSTER
// acctInit();
// adminInit();
// balanceInit();
// clusterInit();
// grantInit();
// mpeerInit();
// storageInit();
#endif
}
......@@ -21,7 +21,7 @@
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
#include "tsdb.h"
//#include "tsdb.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
......@@ -60,7 +60,8 @@ int32_t dnodeInitMgmt() {
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
dError("failed to init vnode list");
return -1;
}
return dnodeOpenVnodes();
......@@ -139,111 +140,111 @@ static void dnodeCleanupVnodes() {
}
static int32_t dnodeOpenVnode(int32_t vgId) {
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
void *pTsdb = tsdbOpenRepo(rootDir);
if (pTsdb != NULL) {
return terrno;
}
SVnodeObj vnodeObj;
vnodeObj.vgId = vgId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
// char rootDir[TSDB_FILENAME_LEN] = {0};
// sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
//
// void *pTsdb = tsdbOpenRepo(rootDir);
// if (pTsdb != NULL) {
// return terrno;
// }
//
// SVnodeObj vnodeObj;
// vnodeObj.vgId = vgId;
// vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
// vnodeObj.refCount = 1;
// vnodeObj.version = 0;
// vnodeObj.wworker = dnodeAllocateWriteWorker();
// vnodeObj.rworker = dnodeAllocateReadWorker();
// vnodeObj.wal = NULL;
// vnodeObj.tsdb = pTsdb;
// vnodeObj.replica = NULL;
// vnodeObj.events = NULL;
// vnodeObj.cq = NULL;
//
// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
return TSDB_CODE_SUCCESS;
}
static void dnodeCleanupVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_NOT_READY;
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count > 0) {
// wait refcount
}
// remove replica
// remove read queue
dnodeFreeReadWorker(pVnode->rworker);
pVnode->rworker = NULL;
// remove write queue
dnodeFreeWriteWorker(pVnode->wworker);
pVnode->wworker = NULL;
// remove wal
// remove tsdb
if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb);
pVnode->tsdb = NULL;
}
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
// pVnode->status = TSDB_VN_STATUS_NOT_READY;
// int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
// if (count > 0) {
// // wait refcount
// }
//
// // remove replica
//
// // remove read queue
// dnodeFreeReadWorker(pVnode->rworker);
// pVnode->rworker = NULL;
//
// // remove write queue
// dnodeFreeWriteWorker(pVnode->wworker);
// pVnode->wworker = NULL;
//
// // remove wal
//
// // remove tsdb
// if (pVnode->tsdb) {
// tsdbCloseRepo(pVnode->tsdb);
// pVnode->tsdb = NULL;
// }
//
// taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
}
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg;
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->vnode;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId);
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
if (pTsdb != NULL) {
return terrno;
}
SVnodeObj vnodeObj;
vnodeObj.vgId = pVnodeCfg->cfg.vgId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
// STsdbCfg tsdbCfg;
// tsdbCfg.precision = pVnodeCfg->cfg.precision;
// tsdbCfg.tsdbId = pVnodeCfg->vnode;
// tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
// tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
// tsdbCfg.minRowsPerFileBlock = -1;
// tsdbCfg.maxRowsPerFileBlock = -1;
// tsdbCfg.keep = -1;
// tsdbCfg.maxCacheSize = -1;
// char rootDir[TSDB_FILENAME_LEN] = {0};
// sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId);
//
// void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
// if (pTsdb != NULL) {
// return terrno;
// }
//
// SVnodeObj vnodeObj;
// vnodeObj.vgId = pVnodeCfg->cfg.vgId;
// vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
// vnodeObj.refCount = 1;
// vnodeObj.version = 0;
// vnodeObj.wworker = dnodeAllocateWriteWorker();
// vnodeObj.rworker = dnodeAllocateReadWorker();
// vnodeObj.wal = NULL;
// vnodeObj.tsdb = pTsdb;
// vnodeObj.replica = NULL;
// vnodeObj.events = NULL;
// vnodeObj.cq = NULL;
//
// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
return TSDB_CODE_SUCCESS;
}
static void dnodeDropVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_NOT_READY;
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count > 0) {
// wait refcount
}
if (pVnode->tsdb) {
tsdbDropRepo(pVnode->tsdb);
pVnode->tsdb = NULL;
}
dnodeCleanupVnode(pVnode);
// pVnode->status = TSDB_VN_STATUS_NOT_READY;
//
// int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
// if (count > 0) {
// // wait refcount
// }
//
// if (pVnode->tsdb) {
// tsdbDropRepo(pVnode->tsdb);
// pVnode->tsdb = NULL;
// }
//
// dnodeCleanupVnode(pVnode);
}
static void dnodeProcesSMDCreateVnodeMsg(SRpcMsg *rpcMsg) {
......
......@@ -17,7 +17,7 @@
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "dnodeSystem.h"
#include "dnode.h"
#include "dnodeMgmt.h"
#include "dnodeWrite.h"
......
......@@ -22,9 +22,9 @@
#include "http.h"
#include "monitor.h"
#include "dnodeModule.h"
#include "dnodeSystem.h"
#include "dnode.h"
void dnodeAllocModules() {
static void dnodeAllocModules() {
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
......@@ -69,6 +69,8 @@ void dnodeCleanUpModules() {
}
int32_t dnodeInitModules() {
dnodeAllocModules();
for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].initFp) {
if ((*tsModule[mod].initFp)() != 0) {
......@@ -81,7 +83,7 @@ int32_t dnodeInitModules() {
return TSDB_CODE_SUCCESS;
}
void dnodeStartModulesImp() {
void dnodeStartModules() {
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].startFp) {
if ((*tsModule[mod].startFp)() != 0) {
......@@ -90,5 +92,3 @@ void dnodeStartModulesImp() {
}
}
}
void (*dnodeStartModules)() = dnodeStartModulesImp;
......@@ -67,9 +67,7 @@ void dnodeCleanupRead() {
taosCloseQset(readQset);
}
void dnodeRead(void *rpcMsg) {
SRpcMsg *pMsg = rpcMsg;
void dnodeRead(SRpcMsg *pMsg) {
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
......
......@@ -20,7 +20,7 @@
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "dnodeSystem.h"
#include "dnode.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
#include "dnodeShell.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tcrc32c.h"
#include "tlog.h"
#include "tmodule.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "http.h"
#include "trpc.h"
#include "dnode.h"
#include "dnodeMgmt.h"
#include "dnodeModule.h"
#include "dnodeShell.h"
#include "dnodeSystem.h"
#ifdef CLUSTER
#include "account.h"
#include "admin.h"
#include "balance.h"
#include "cluster.h"
#include "grant.h"
#include "mpeer.h"
#include "storage.h"
#include "vpeer.h"
#endif
static pthread_mutex_t tsDnodeMutex;
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitRpcQHandle();
static int32_t dnodeInitTmrCtl();
int32_t (*dnodeInitStorage)() = NULL;
void (*dnodeCleanupStorage)() = NULL;
int32_t (*dnodeInitPeers)(int32_t numOfThreads) = NULL;
void *tsDnodeTmr;
void **tsRpcQhandle;
int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1;
int32_t tsMaxQueues;
uint32_t tsRebootTime;
static void dnodeInitVnodesLock() {
pthread_mutex_init(&tsDnodeMutex, NULL);
}
void dnodeLockVnodes() {
pthread_mutex_lock(&tsDnodeMutex);
}
void dnodeUnLockVnodes() {
pthread_mutex_unlock(&tsDnodeMutex);
}
static void dnodeCleanVnodesLock() {
pthread_mutex_destroy(&tsDnodeMutex);
}
SDnodeRunStatus dnodeGetRunStatus() {
return tsDnodeRunStatus;
}
void dnodeSetRunStatus(SDnodeRunStatus status) {
tsDnodeRunStatus = status;
}
void dnodeCleanUpSystem() {
tclearModuleStatus(TSDB_MOD_MGMT);
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_STOPPED) {
return;
} else {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
}
dnodeCleanupShell();
dnodeCleanUpModules();
dnodeCleanupMgmt();
taosCloseLogger();
dnodeCleanupStorage();
dnodeCleanVnodesLock();
}
void dnodeCheckDataDirOpenned(const char *dir) {
char filepath[256] = {0};
sprintf(filepath, "%s/.running", dir);
int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret != 0) {
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
exit(0);
}
}
void dnodeInitPlugins() {
#ifdef CLUSTER
// acctInit();
// adminInit();
// balanceInit();
// clusterInit();
// grantInit();
// mpeerInit();
// storageInit();
#endif
}
int32_t dnodeInitSystem() {
tsRebootTime = taosGetTimestampSec();
tscEmbedded = 1;
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
taosResolveCRC();
// Read global configuration.
tsReadGlobalLogConfig();
struct stat dirstat;
if (stat(logDir, &dirstat) < 0) {
mkdir(logDir, 0755);
}
char temp[128];
sprintf(temp, "%s/taosdlog", logDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!tsReadGlobalConfig()) {
tsPrintGlobalConfig();
dError("TDengine read global config failed");
return -1;
}
if (dnodeInitStorage() != 0) {
dError("TDengine init tier directory failed");
return -1;
}
// dnodeInitMgmtIp();
tsPrintGlobalConfig();
dPrint("Server IP address is:%s", tsPrivateIp);
taosSetCoreDump();
signal(SIGPIPE, SIG_IGN);
dnodeAllocModules();
dnodeInitVnodesLock();
dPrint("starting to initialize TDengine ...");
if (dnodeInitRpcQHandle() < 0) {
dError("failed to init query qhandle, exit");
return -1;
}
if (dnodeCheckSystem() < 0) {
return -1;
}
if (dnodeInitModules() < 0) {
return -1;
}
if (dnodeInitTmrCtl() < 0) {
dError("failed to init timer, exit");
return -1;
}
if (dnodeInitMgmt() < 0) {
dError("failed to init vnode storage");
return -1;
}
int32_t numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (numOfThreads < 1) numOfThreads = 1;
if (dnodeInitPeers(numOfThreads) < 0) {
dError("failed to init vnode peer communication");
return -1;
}
if (dnodeInitMgmt() < 0) {
dError("failed to init communication to mgmt");
return -1;
}
if (dnodeInitShell() < 0) {
dError("failed to init communication to shell");
return -1;
}
dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
dPrint("TDengine is initialized successfully");
return 0;
}
int32_t dnodeInitStorageImp() {
struct stat dirstat;
strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) {
mkdir(dataDir, 0755);
}
char fileName[128];
sprintf(fileName, "%s/tsdb", tsDirectory);
mkdir(fileName, 0755);
sprintf(fileName, "%s/data", tsDirectory);
mkdir(fileName, 0755);
sprintf(tsMgmtDirectory, "%s/mgmt", tsDirectory);
sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDataDirOpenned(dataDir);
return 0;
}
static int32_t dnodeInitTmrCtl() {
tsDnodeTmr = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000,
"DND-vnode");
if (tsDnodeTmr == NULL) {
dError("failed to init timer, exit");
return -1;
}
return 0;
}
static int32_t dnodeInitRpcQHandle() {
tsMaxQueues = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (tsMaxQueues < 1) {
tsMaxQueues = 1;
}
tsRpcQhandle = malloc(tsMaxQueues * sizeof(void *));
for (int32_t i = 0; i < tsMaxQueues; ++i) {
tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
}
return 0;
}
int32_t dnodeCheckSystemImp() {
return 0;
}
int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp;
int32_t dnodeInitPeersImp(int32_t numOfThreads) {
return 0;
}
......@@ -79,9 +79,7 @@ void dnodeCleanupWrite() {
free(wWorkerPool.writeWorker);
}
void dnodeWrite(void *rpcMsg) {
SRpcMsg *pMsg = rpcMsg;
void dnodeWrite(SRpcMsg *pMsg) {
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
int32_t contLen = 0;
......
......@@ -21,7 +21,7 @@ extern "C" {
#endif
#include <stdint.h>
#include <pthread.h>
#include <stdbool.h>
typedef struct {
int32_t queryReqNum;
......@@ -29,32 +29,13 @@ typedef struct {
int32_t httpReqNum;
} SDnodeStatisInfo;
typedef struct {
char id[20];
char sid;
void *thandle;
int mgmtIndex;
char status; // 0:offline, 1:online
} SMgmtObj;
// global variables
extern uint32_t tsRebootTime;
// dnodeCluster
extern void (*dnodeStartModules)();
extern int32_t (*dnodeCheckSystem)();
// dnodeSystem
void dnodeCheckDataDirOpenned(const char* dir);
// dnodeModule
extern void (*dnodeStartModules)();
typedef enum {
TSDB_DNODE_RUN_STATUS_INITIALIZE,
TSDB_DNODE_RUN_STATUS_RUNING,
TSDB_DNODE_RUN_STATUS_STOPPED
} SDnodeRunStatus;
void dnodeLockVnodes();
void dnodeUnLockVnodes();
SDnodeRunStatus dnodeGetRunStatus();
SDnodeStatisInfo dnodeGetStatisInfo();
#ifdef __cplusplus
......
......@@ -21,7 +21,7 @@
#include "trpc.h"
#include "tstatus.h"
#include "tsched.h"
#include "dnodeSystem.h"
#include "dnode.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtBalance.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册