提交 6321b0df 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'haoyifan/fix_init' into develop

...@@ -22,7 +22,7 @@ extern "C" { ...@@ -22,7 +22,7 @@ extern "C" {
int32_t dnodeInitModules(); int32_t dnodeInitModules();
void dnodeStartModules(); void dnodeStartModules();
void dnodeCleanUpModules(); void dnodeCleanupModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus); void dnodeProcessModuleStatus(uint32_t moduleStatus);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -36,6 +36,46 @@ static void dnodeCleanupStorage(); ...@@ -36,6 +36,46 @@ static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SDnodeRunStatus status); static void dnodeSetRunStatus(SDnodeRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir); static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitComponents();
static void dnodeCleanupComponents(int32_t stepId);
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
} SDnodeComponent;
static const SDnodeComponent SDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
{"client", dnodeInitClient, dnodeCleanupClient},
{"server", dnodeInitServer, dnodeCleanupServer},
{"mgmt", dnodeInitMgmt, dnodeCleanupMgmt},
{"modules", dnodeInitModules, dnodeCleanupModules},
{"shell", dnodeInitShell, dnodeCleanupShell}
};
static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
SDnodeComponents[i].cleanup();
}
}
static int32_t dnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) {
if (SDnodeComponents[i].init() != 0) {
dnodeCleanupComponents(i);
code = -1;
break;
}
}
return code;
}
int32_t dnodeInitSystem() { int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
...@@ -67,17 +107,9 @@ int32_t dnodeInitSystem() { ...@@ -67,17 +107,9 @@ int32_t dnodeInitSystem() {
dPrint("start to initialize TDengine on %s", tsLocalEp); dPrint("start to initialize TDengine on %s", tsLocalEp);
if (dnodeInitStorage() != 0) return -1; if (dnodeInitComponents() != 0) {
if (dnodeInitVnodeRead() != 0) return -1; return -1;
if (dnodeInitVnodeWrite() != 0) return -1; }
if (dnodeInitMnodeRead() != 0) return -1;
if (dnodeInitMnodeWrite() != 0) return -1;
if (dnodeInitMnodePeer() != 0) return -1;
if (dnodeInitClient() != 0) return -1;
if (dnodeInitServer() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1;
if (dnodeInitModules() != 0) return -1;
if (dnodeInitShell() != 0) return -1;
dnodeStartModules(); dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
...@@ -90,17 +122,7 @@ int32_t dnodeInitSystem() { ...@@ -90,17 +122,7 @@ int32_t dnodeInitSystem() {
void dnodeCleanUpSystem() { void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupShell(); dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1);
dnodeCleanUpModules();
dnodeCleanupMgmt();
dnodeCleanupServer();
dnodeCleanupClient();
dnodeCleanupMnodePeer();
dnodeCleanupMnodeWrite();
dnodeCleanupMnodeRead();
dnodeCleanupVnodeWrite();
dnodeCleanupVnodeRead();
dnodeCleanupStorage();
taos_cleanup(); taos_cleanup();
taosCloseLog(); taosCloseLog();
} }
......
...@@ -83,7 +83,7 @@ static void dnodeAllocModules() { ...@@ -83,7 +83,7 @@ static void dnodeAllocModules() {
} }
} }
void dnodeCleanUpModules() { void dnodeCleanupModules() {
for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) { for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].enable && tsModule[module].stopFp) { if (tsModule[module].enable && tsModule[module].stopFp) {
(*tsModule[module].stopFp)(); (*tsModule[module].stopFp)();
......
...@@ -39,11 +39,11 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -39,11 +39,11 @@ int32_t main(int32_t argc, char *argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} else if (strcmp(argv[i], "-V") == 0) { } else if (strcmp(argv[i], "-V") == 0) {
#ifdef _SYNC #ifdef _SYNC
char *versionStr = "enterprise"; char *versionStr = "enterprise";
#else #else
char *versionStr = "community"; char *versionStr = "community";
#endif #endif
printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version); printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version);
printf("gitinfo: %s\n", gitinfo); printf("gitinfo: %s\n", gitinfo);
printf("gitinfoI: %s\n", gitinfoOfInternal); printf("gitinfoI: %s\n", gitinfoOfInternal);
...@@ -93,8 +93,6 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -93,8 +93,6 @@ int32_t main(int32_t argc, char *argv[]) {
if (dnodeInitSystem() < 0) { if (dnodeInitSystem() < 0) {
syslog(LOG_ERR, "Error initialize TDengine system"); syslog(LOG_ERR, "Error initialize TDengine system");
closelog(); closelog();
dnodeCleanUpSystem();
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
......
...@@ -34,7 +34,7 @@ typedef struct { ...@@ -34,7 +34,7 @@ typedef struct {
} SReadMsg; } SReadMsg;
typedef struct { typedef struct {
pthread_t thread; // thread pthread_t thread; // thread
int32_t workerId; // worker ID int32_t workerId; // worker ID
} SReadWorker; } SReadWorker;
...@@ -85,8 +85,8 @@ void dnodeCleanupVnodeRead() { ...@@ -85,8 +85,8 @@ void dnodeCleanupVnodeRead() {
} }
} }
taosCloseQset(readQset);
free(readPool.readWorker); free(readPool.readWorker);
taosCloseQset(readQset);
dPrint("dnode read is closed"); dPrint("dnode read is closed");
} }
...@@ -95,7 +95,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -95,7 +95,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
void *pVnode; void *pVnode;
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
...@@ -166,7 +166,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) { ...@@ -166,7 +166,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
} while (readPool.num < readPool.min); } while (readPool.num < readPool.min);
} }
dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue); dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue);
return queue; return queue;
} }
...@@ -177,13 +177,13 @@ void dnodeFreeVnodeRqueue(void *rqueue) { ...@@ -177,13 +177,13 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads // dynamically adjust the number of threads
} }
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg; pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode); taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册