提交 abb7731a 编写于 作者: S slguan

#1177

上级 2f7883b1
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODEMGMT_H
#define TDENGINE_VNODEMGMT_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
char id[20];
char sid;
void *thandle;
int mgmtIndex;
char status; // 0:offline, 1:online
} SMgmtObj;
int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEMGMT_H
...@@ -13,44 +13,32 @@ ...@@ -13,44 +13,32 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE #ifndef TDENGINE_DNODE_PLUGIN_H
#include "mgmt.h" #define TDENGINE_DNODE_PLUGIN_H
#include "dnodeSystem.h"
extern SModule tsModule[TSDB_MOD_MAX];
int taosCreateTierDirectory() {
struct stat dirstat;
strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) {
mkdir(dataDir, 0755);
}
char fileName[128]; #ifdef __cplusplus
extern "C" {
#endif
sprintf(fileName, "%s/tsdb", tsDirectory); #include <stdint.h>
mkdir(fileName, 0755); #include <pthread.h>
sprintf(fileName, "%s/data", tsDirectory); #include "tsched.h"
mkdir(fileName, 0755); #include "mgmt.h"
sprintf(mgmtDirectory, "%s/mgmt", tsDirectory);
sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDbRunning(dataDir);
return 0; char *(*taosBuildRspMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
} char *(*taosBuildReqMsgToMnodeWithSize)(SMgmtObj *pObj, char type, int size);
char *(*taosBuildRspMsgToMnode)(SMgmtObj *pObj, char type);
char *(*taosBuildReqMsgToMnode)(SMgmtObj *pObj, char type);
int (*taosSendMsgToMnode)(SMgmtObj *pObj, char *msg, int msgLen);
int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code);
int dnodeInitSystemSpec() { return 0; } void (*dnodeInitMgmtIp)();
void (*dnodeProcessMsgFromMgmt)(SSchedMsg *sched);
int (*dnodeInitMgmtConn)();
void dnodeStartModuleSpec() { #ifdef __cplusplus
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].startFp) {
if ((*tsModule[mod].startFp)() != 0) {
dError("failed to start module:%d", mod);
}
}
}
} }
#endif
void dnodeParseParameterK() {} #endif
\ No newline at end of file
...@@ -56,7 +56,7 @@ void dnodeCheckDbRunning(const char* dir); ...@@ -56,7 +56,7 @@ void dnodeCheckDbRunning(const char* dir);
void vnodeCleanUpSystem(); void vnodeCleanUpSystem();
int vnodeInitSystem(); int vnodeInitSystem();
void vnodeInitMgmtIp(); void dnodeInitMgmtIp();
void vnodeInitQHandle(); void vnodeInitQHandle();
int mgmtInitSystem(); int mgmtInitSystem();
......
...@@ -51,7 +51,7 @@ char *taosBuildReqMsgToMnode(SMgmtObj *pObj, char type); ...@@ -51,7 +51,7 @@ char *taosBuildReqMsgToMnode(SMgmtObj *pObj, char type);
int taosSendSimpleRspToMnode(SMgmtObj *pObj, char rsptype, char code); int taosSendSimpleRspToMnode(SMgmtObj *pObj, char rsptype, char code);
int taosSendMsgToMnode(SMgmtObj *pObj, char *msg, int msgLen); int taosSendMsgToMnode(SMgmtObj *pObj, char *msg, int msgLen);
void vnodeProcessMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj) { void dnodeProcessMsgFromMgmtImp(char *content, int msgLen, int msgType, SMgmtObj *pObj) {
if (msgType == TSDB_MSG_TYPE_CREATE) { if (msgType == TSDB_MSG_TYPE_CREATE) {
vnodeProcessCreateMeterRequest(content, msgLen, pObj); vnodeProcessCreateMeterRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_VPEERS) { } else if (msgType == TSDB_MSG_TYPE_VPEERS) {
......
...@@ -14,14 +14,16 @@ ...@@ -14,14 +14,16 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tsched.h"
#include "dnodePlugin.h"
#include "dnodeMgmt.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeMgmt.h" #include "vnodeMgmt.h"
void*vnodeProcessMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj);
void mgmtProcessMsgFromDnodeSpec(SSchedMsg *sched);
char *taosBuildRspMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size) { char *taosBuildRspMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size); char *pStart = (char *)malloc(size);
if (pStart == NULL) { if (pStart == NULL) {
return NULL; return NULL;
...@@ -31,7 +33,7 @@ char *taosBuildRspMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size) { ...@@ -31,7 +33,7 @@ char *taosBuildRspMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size) {
return pStart + 1; return pStart + 1;
} }
char *taosBuildReqMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size) { char *taosBuildReqMsgToMnodeWithSizeEdgeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size); char *pStart = (char *)malloc(size);
if (pStart == NULL) { if (pStart == NULL) {
return NULL; return NULL;
...@@ -41,15 +43,15 @@ char *taosBuildReqMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size) { ...@@ -41,15 +43,15 @@ char *taosBuildReqMsgToMnodeWithSize(SMgmtObj *pObj, char type, int size) {
return pStart + 1; return pStart + 1;
} }
char *taosBuildRspMsgToMnode(SMgmtObj *pObj, char type) { char *taosBuildRspMsgToMnodeEdgeImp(SMgmtObj *pObj, char type) {
return taosBuildRspMsgToMnodeWithSize(pObj, type, 256); return taosBuildRspMsgToMnodeWithSize(pObj, type, 256);
} }
char *taosBuildReqMsgToMnode(SMgmtObj *pObj, char type) { char *taosBuildReqMsgToMnodeEdgeImp(SMgmtObj *pObj, char type) {
return taosBuildReqMsgToMnodeWithSize(pObj, type, 256); return taosBuildReqMsgToMnodeWithSize(pObj, type, 256);
} }
int taosSendMsgToMnode(SMgmtObj *pObj, char *msg, int msgLen) { int taosSendMsgToMnodeEdgeImp(SMgmtObj *pObj, char *msg, int msgLen) {
dTrace("msg:%s is sent to mnode", taosMsg[(uint8_t)(*(msg-1))]); dTrace("msg:%s is sent to mnode", taosMsg[(uint8_t)(*(msg-1))]);
/* /*
...@@ -65,7 +67,7 @@ int taosSendMsgToMnode(SMgmtObj *pObj, char *msg, int msgLen) { ...@@ -65,7 +67,7 @@ int taosSendMsgToMnode(SMgmtObj *pObj, char *msg, int msgLen) {
return 0; return 0;
} }
int taosSendSimpleRspToMnode(SMgmtObj *pObj, char rsptype, char code) { int taosSendSimpleRspToMnodeEdgeImp(SMgmtObj *pObj, char rsptype, char code) {
char *pStart = taosBuildRspMsgToMnode(0, rsptype); char *pStart = taosBuildRspMsgToMnode(0, rsptype);
if (pStart == NULL) { if (pStart == NULL) {
return 0; return 0;
...@@ -77,19 +79,35 @@ int taosSendSimpleRspToMnode(SMgmtObj *pObj, char rsptype, char code) { ...@@ -77,19 +79,35 @@ int taosSendSimpleRspToMnode(SMgmtObj *pObj, char rsptype, char code) {
return 0; return 0;
} }
void vnodeProcessMsgFromMgmtSpec(SSchedMsg *sched) { void dnodeProcessMsgFromMgmtEdgeImp(SSchedMsg *sched) {
char msgType = *sched->msg; char msgType = *sched->msg;
char *content = sched->msg + 1; char *content = sched->msg + 1;
dTrace("msg:%s is received from mgmt", taosMsg[(uint8_t)msgType]); dTrace("msg:%s is received from mgmt", taosMsg[(uint8_t)msgType]);
vnodeProcessMsgFromMgmt(content, 0, msgType, 0); dnodeProcessMsgFromMgmtImp(content, 0, msgType, 0);
free(sched->msg); free(sched->msg);
} }
int vnodeInitMgmt() { return 0; } int dnodeInitMgmtConnEdgeImp() {
return 0;
}
void dnodeInitMgmtIpEdgeImp() {}
void dnodeInitPlugin() {
dnodeInitMgmtConn = dnodeInitMgmtConnEdgeImp;
dnodeInitMgmtIp = dnodeInitMgmtIpEdgeImp;
dnodeProcessMsgFromMgmt = dnodeProcessMsgFromMgmtEdgeImp;
taosBuildRspMsgToMnodeWithSize = taosBuildRspMsgToMnodeWithSizeEdgeImp;
taosBuildReqMsgToMnodeWithSize = taosBuildReqMsgToMnodeWithSizeEdgeImp;
taosBuildRspMsgToMnode = taosBuildRspMsgToMnodeEdgeImp;
taosBuildReqMsgToMnode = taosBuildReqMsgToMnodeEdgeImp;
taosSendMsgToMnode = taosSendMsgToMnodeEdgeImp;
taosSendSimpleRspToMnode = taosSendSimpleRspToMnodeEdgeImp;
}
void vnodeInitMgmtIp() {}
int vnodeSaveCreateMsgIntoQueue(SVnodeObj *pVnode, char *pMsg, int msgLen) { return 0; }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
...@@ -133,7 +133,7 @@ int dnodeInitSystem() { ...@@ -133,7 +133,7 @@ int dnodeInitSystem() {
return -1; return -1;
} }
vnodeInitMgmtIp(); dnodeInitMgmtIp();
tsPrintGlobalConfig(); tsPrintGlobalConfig();
dPrint("Server IP address is:%s", tsPrivateIp); dPrint("Server IP address is:%s", tsPrivateIp);
...@@ -216,3 +216,44 @@ void dnodeCountRequest(SCountInfo *info) { ...@@ -216,3 +216,44 @@ void dnodeCountRequest(SCountInfo *info) {
info->selectReqNum = atomic_exchange_32(&vnodeSelectReqNum, 0); info->selectReqNum = atomic_exchange_32(&vnodeSelectReqNum, 0);
info->insertReqNum = atomic_exchange_32(&vnodeInsertReqNum, 0); info->insertReqNum = atomic_exchange_32(&vnodeInsertReqNum, 0);
} }
//spec
extern SModule tsModule[TSDB_MOD_MAX];
int taosCreateTierDirectory() {
struct stat dirstat;
strcpy(tsDirectory, dataDir);
if (stat(dataDir, &dirstat) < 0) {
mkdir(dataDir, 0755);
}
char fileName[128];
sprintf(fileName, "%s/tsdb", tsDirectory);
mkdir(fileName, 0755);
sprintf(fileName, "%s/data", tsDirectory);
mkdir(fileName, 0755);
sprintf(mgmtDirectory, "%s/mgmt", tsDirectory);
sprintf(tsDirectory, "%s/tsdb", dataDir);
dnodeCheckDbRunning(dataDir);
return 0;
}
int dnodeInitSystemSpec() { return 0; }
void dnodeStartModuleSpec() {
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].startFp) {
if ((*tsModule[mod].startFp)() != 0) {
dError("failed to start module:%d", mod);
}
}
}
}
void dnodeParseParameterK() {}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
...@@ -80,7 +80,7 @@ int vnodeInitSystem() { ...@@ -80,7 +80,7 @@ int vnodeInitSystem() {
return -1; return -1;
} }
if (vnodeInitMgmt() < 0) { if (dnodeInitMgmtConn() < 0) {
dError("failed to init communication to mgmt"); dError("failed to init communication to mgmt");
return -1; return -1;
} }
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
extern void *dmQhandle; extern void *dmQhandle;
void * mgmtStatusTimer = NULL; void * mgmtStatusTimer = NULL;
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj); void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj);
void vnodeProcessMsgFromMgmtSpec(SSchedMsg *sched); void dnodeProcessMsgFromMgmt(SSchedMsg *sched);
char *taosBuildRspMsgToDnodeWithSize(SDnodeObj *pObj, char type, int size) { char *taosBuildRspMsgToDnodeWithSize(SDnodeObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size); char *pStart = (char *)malloc(size);
...@@ -67,7 +67,7 @@ int taosSendMsgToDnode(SDnodeObj *pObj, char *msg, int msgLen) { ...@@ -67,7 +67,7 @@ int taosSendMsgToDnode(SDnodeObj *pObj, char *msg, int msgLen) {
* Lite version has no message header, so minus one * Lite version has no message header, so minus one
*/ */
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.fp = vnodeProcessMsgFromMgmtSpec; schedMsg.fp = dnodeProcessMsgFromMgmt;
schedMsg.msg = msg - 1; schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL; schedMsg.ahandle = NULL;
schedMsg.thandle = NULL; schedMsg.thandle = NULL;
......
...@@ -524,7 +524,7 @@ SConnSec *vnodeGetMeterSec(int vnode, int sid); ...@@ -524,7 +524,7 @@ SConnSec *vnodeGetMeterSec(int vnode, int sid);
int vnodeCreateMeterObjFile(int vnode); int vnodeCreateMeterObjFile(int vnode);
// mgmt // mgmt
int vnodeInitMgmt(); int dnodeInitMgmtConn();
void vnodeCleanUpMgmt(); void vnodeCleanUpMgmt();
......
...@@ -32,7 +32,7 @@ int vnodeInitStore(); ...@@ -32,7 +32,7 @@ int vnodeInitStore();
int vnodeInitPeer(int numOfThreads); int vnodeInitPeer(int numOfThreads);
int vnodeInitMgmt(); int dnodeInitMgmtConn();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册