提交 1f886b11 编写于 作者: S Shengliang Guan

refactor: make more object global

上级 69728a99
...@@ -45,7 +45,7 @@ typedef void (*SendRspFp)(const SRpcMsg* pRsp); ...@@ -45,7 +45,7 @@ typedef void (*SendRspFp)(const SRpcMsg* pRsp);
typedef void (*SendRedirectRspFp)(const SRpcMsg* pRsp, const SEpSet* pNewEpSet); typedef void (*SendRedirectRspFp)(const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc); typedef void (*ReportStartup)(const char* name, const char* desc);
typedef struct { typedef struct {
SMgmtWrapper* pWrapper; SMgmtWrapper* pWrapper;
......
...@@ -40,6 +40,11 @@ void dmCleanup(); ...@@ -40,6 +40,11 @@ void dmCleanup();
*/ */
int32_t dmRun(); int32_t dmRun();
/**
* @brief Stop dnode.
*/
void dmStop();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -71,9 +71,5 @@ void tmsgReleaseHandle(void* handle, int8_t type) { ...@@ -71,9 +71,5 @@ void tmsgReleaseHandle(void* handle, int8_t type) {
void tmsgReportStartup(const char* name, const char* desc) { void tmsgReportStartup(const char* name, const char* desc) {
ReportStartup fp = tsDefaultMsgCb.reportStartupFp; ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
if (fp != NULL && tsDefaultMsgCb.pWrapper != NULL) { (*fp)(name, desc);
(*fp)(tsDefaultMsgCb.pWrapper, name, desc);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
} }
\ No newline at end of file
...@@ -36,16 +36,10 @@ static struct { ...@@ -36,16 +36,10 @@ static struct {
char apolloUrl[PATH_MAX]; char apolloUrl[PATH_MAX];
const char **envCmd; const char **envCmd;
SArray *pArgs; // SConfigPair SArray *pArgs; // SConfigPair
SDnode *pDnode;
EDndNodeType ntype; EDndNodeType ntype;
} global = {0}; } global = {0};
static void dmStopDnode(int signum, void *info, void *ctx) { static void dmStopDnode(int signum, void *info, void *ctx) { dmStop(); }
SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode);
if (pDnode != NULL) {
dmSetEvent(pDnode, DND_EVENT_STOP);
}
}
static void dmSetSignalHandle() { static void dmSetSignalHandle() {
taosSetSignal(SIGTERM, dmStopDnode); taosSetSignal(SIGTERM, dmStopDnode);
...@@ -69,8 +63,8 @@ static void dmSetSignalHandle() { ...@@ -69,8 +63,8 @@ static void dmSetSignalHandle() {
static int32_t dmParseArgs(int32_t argc, char const *argv[]) { static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
int32_t cmdEnvIndex = 0; int32_t cmdEnvIndex = 0;
if (argc < 2) return 0; if (argc < 2) return 0;
global.envCmd = taosMemoryMalloc((argc-1)*sizeof(char*)); global.envCmd = taosMemoryMalloc((argc - 1) * sizeof(char *));
memset(global.envCmd, 0, (argc-1)*sizeof(char*)); memset(global.envCmd, 0, (argc - 1) * sizeof(char *));
for (int32_t i = 1; i < argc; ++i) { for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
...@@ -102,7 +96,8 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) { ...@@ -102,7 +96,8 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
} else if (strcmp(argv[i], "-e") == 0) { } else if (strcmp(argv[i], "-e") == 0) {
global.envCmd[cmdEnvIndex] = argv[++i]; global.envCmd[cmdEnvIndex] = argv[++i];
cmdEnvIndex++; cmdEnvIndex++;
} else if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || strcmp(argv[i], "-?")) { } else if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 ||
strcmp(argv[i], "-?")) {
global.printHelp = true; global.printHelp = true;
} else { } else {
} }
...@@ -144,23 +139,6 @@ static void dmDumpCfg() { ...@@ -144,23 +139,6 @@ static void dmDumpCfg() {
cfgDumpCfg(pCfg, 0, true); cfgDumpCfg(pCfg, 0, true);
} }
static SDnodeOpt dmGetOpt() {
SConfig *pCfg = taosGetCfg();
SDnodeOpt option = {0};
option.numOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
tstrncpy(option.dataDir, tsDataDir, sizeof(option.dataDir));
tstrncpy(option.firstEp, tsFirst, sizeof(option.firstEp));
tstrncpy(option.secondEp, tsSecond, sizeof(option.firstEp));
option.serverPort = tsServerPort;
tstrncpy(option.localFqdn, tsLocalFqdn, sizeof(option.localFqdn));
snprintf(option.localEp, sizeof(option.localEp), "%s:%u", option.localFqdn, option.serverPort);
option.disks = tsDiskCfg;
option.numOfDisks = tsDiskCfgNum;
option.ntype = global.ntype;
return option;
}
static int32_t dmInitLog() { static int32_t dmInitLog() {
char logName[12] = {0}; char logName[12] = {0};
snprintf(logName, sizeof(logName), "%slog", dmNodeLogName(global.ntype)); snprintf(logName, sizeof(logName), "%slog", dmNodeLogName(global.ntype));
...@@ -175,34 +153,6 @@ static void dmSetProcInfo(int32_t argc, char **argv) { ...@@ -175,34 +153,6 @@ static void dmSetProcInfo(int32_t argc, char **argv) {
} }
} }
static int32_t dmRunDnode() {
if (dmInit() != 0) {
dError("failed to init environment since %s", terrstr());
return -1;
}
SDnodeOpt option = dmGetOpt();
SDnode *pDnode = dmCreate(&option);
if (pDnode == NULL) {
dError("failed to to create dnode since %s", terrstr());
return -1;
} else {
global.pDnode = pDnode;
dmSetSignalHandle();
}
dInfo("start to run dnode");
int32_t code = dmRun(pDnode);
dInfo("shutting down the service");
global.pDnode = NULL;
dmClose(pDnode);
dmCleanup();
taosCloseLog();
taosCleanupCfg();
return code;
}
static void taosCleanupArgs() { static void taosCleanupArgs() {
if (global.envCmd != NULL) taosMemoryFree(global.envCmd); if (global.envCmd != NULL) taosMemoryFree(global.envCmd);
} }
...@@ -259,5 +209,17 @@ int main(int argc, char const *argv[]) { ...@@ -259,5 +209,17 @@ int main(int argc, char const *argv[]) {
dmSetProcInfo(argc, (char **)argv); dmSetProcInfo(argc, (char **)argv);
taosCleanupArgs(); taosCleanupArgs();
return dmRunDnode();
if (dmInit(global.ntype) != 0) {
dError("failed to init dnode since %s", terrstr());
return -1;
}
dInfo("start to run dnode");
dmSetSignalHandle();
int32_t code = dmRun();
dInfo("shutting down the service");
dmCleanup();
return code;
} }
...@@ -25,11 +25,11 @@ extern "C" { ...@@ -25,11 +25,11 @@ extern "C" {
#endif #endif
typedef struct SBnodeMgmt { typedef struct SBnodeMgmt {
SDnodeData *pData;
SBnode *pBnode; SBnode *pBnode;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
int32_t dnodeId;
SMultiWorker writeWorker; SMultiWorker writeWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
} SBnodeMgmt; } SBnodeMgmt;
......
...@@ -76,7 +76,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -76,7 +76,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr()); dError("failed to drop bnode since %s", terrstr());
return -1; return -1;
......
...@@ -39,9 +39,9 @@ int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -39,9 +39,9 @@ int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return -1; return -1;
} }
pMgmt->pData = pInput->pData;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.pMgmt = pMgmt;
......
...@@ -23,7 +23,6 @@ extern "C" { ...@@ -23,7 +23,6 @@ extern "C" {
#endif #endif
typedef struct SDnodeMgmt { typedef struct SDnodeMgmt {
struct SDnode *pDnode;
SDnodeData *pData; SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
......
...@@ -59,8 +59,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -59,8 +59,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.rebootTime = pMgmt->pData->rebootTime; req.rebootTime = pMgmt->pData->rebootTime;
req.updateTime = pMgmt->pData->updateTime; req.updateTime = pMgmt->pData->updateTime;
req.numOfCores = tsNumOfCores; req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = pMgmt->pData->supportVnodes; req.numOfSupportVnodes = tsNumOfSupportVnodes;
tstrncpy(req.dnodeEp, pMgmt->pData->localEp, TSDB_EP_LEN); tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0; req.clusterCfg.checkTime = 0;
......
...@@ -39,7 +39,6 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -39,7 +39,6 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return -1; return -1;
} }
pMgmt->pDnode = pInput->pDnode;
pMgmt->pData = pInput->pData; pMgmt->pData = pInput->pData;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
......
...@@ -21,9 +21,8 @@ ...@@ -21,9 +21,8 @@
SRpcMsg rsp = {0}; \ SRpcMsg rsp = {0}; \
SRpcMsg req = {.msgType = mtype}; \ SRpcMsg req = {.msgType = mtype}; \
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
tstrncpy(epset.eps[0].fqdn, pMgmt->pData->localFqdn, TSDB_FQDN_LEN); \ tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
epset.eps[0].port = pMgmt->pData->serverPort; \ epset.eps[0].port = tsServerPort; \
\
rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \ rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \
if (rsp.code == 0 && rsp.contLen > 0) { \ if (rsp.code == 0 && rsp.contLen > 0) { \
func(rsp.pCont, rsp.contLen, pInfo); \ func(rsp.pCont, rsp.contLen, pInfo); \
...@@ -40,10 +39,10 @@ static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) { ...@@ -40,10 +39,10 @@ static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) {
static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) { static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f); pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f);
pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, MNODE); pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(MNODE);
pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, QNODE); pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(QNODE);
pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, SNODE); pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(SNODE);
pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, BNODE); pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(BNODE);
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size; pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
......
...@@ -116,28 +116,28 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -116,28 +116,28 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
code = dmProcessGrantRsp(pMgmt, pMsg); code = dmProcessGrantRsp(pMgmt, pMsg);
break; break;
case TDMT_DND_CREATE_MNODE: case TDMT_DND_CREATE_MNODE:
code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, MNODE, pMsg); code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
break; break;
case TDMT_DND_DROP_MNODE: case TDMT_DND_DROP_MNODE:
code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, MNODE, pMsg); code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
break; break;
case TDMT_DND_CREATE_QNODE: case TDMT_DND_CREATE_QNODE:
code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, QNODE, pMsg); code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
break; break;
case TDMT_DND_DROP_QNODE: case TDMT_DND_DROP_QNODE:
code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, QNODE, pMsg); code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
break; break;
case TDMT_DND_CREATE_SNODE: case TDMT_DND_CREATE_SNODE:
code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, SNODE, pMsg); code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
break; break;
case TDMT_DND_DROP_SNODE: case TDMT_DND_DROP_SNODE:
code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, SNODE, pMsg); code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
break; break;
case TDMT_DND_CREATE_BNODE: case TDMT_DND_CREATE_BNODE:
code = (*pMgmt->processCreateNodeFp)(pMgmt->pDnode, BNODE, pMsg); code = (*pMgmt->processCreateNodeFp)(BNODE, pMsg);
break; break;
case TDMT_DND_DROP_BNODE: case TDMT_DND_DROP_BNODE:
code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, BNODE, pMsg); code = (*pMgmt->processDropNodeFp)(BNODE, pMsg);
break; break;
case TDMT_DND_SERVER_STATUS: case TDMT_DND_SERVER_STATUS:
code = dmProcessServerRunStatus(pMgmt, pMsg); code = dmProcessServerRunStatus(pMgmt, pMsg);
......
...@@ -24,11 +24,11 @@ extern "C" { ...@@ -24,11 +24,11 @@ extern "C" {
#endif #endif
typedef struct SMnodeMgmt { typedef struct SMnodeMgmt {
SDnodeData *pData;
SMnode *pMnode; SMnode *pMnode;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
int32_t dnodeId;
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
......
...@@ -109,7 +109,7 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -109,7 +109,7 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr()); dError("failed to drop mnode since %s", terrstr());
return -1; return -1;
...@@ -133,9 +133,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -133,9 +133,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->dnodeId != 0 && alterReq.dnodeId != pMgmt->dnodeId) { if (pMgmt->pData->dnodeId != 0 && alterReq.dnodeId != pMgmt->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->dnodeId); dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->pData->dnodeId);
return -1; return -1;
} else { } else {
return mmAlter(pMgmt, &alterReq); return mmAlter(pMgmt, &alterReq);
......
...@@ -70,7 +70,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre ...@@ -70,7 +70,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
pReplica->id = pCreate->replicas[i].id; pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port; pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pMgmt->dnodeId) { if (pReplica->id == pMgmt->pData->dnodeId) {
pOption->selfIndex = i; pOption->selfIndex = i;
} }
} }
...@@ -128,9 +128,9 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -128,9 +128,9 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return -1; return -1;
} }
pMgmt->pData = pInput->pData;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
...@@ -148,7 +148,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -148,7 +148,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (!deployed) { if (!deployed) {
dInfo("mnode start to deploy"); dInfo("mnode start to deploy");
pMgmt->dnodeId = 1; pMgmt->pData->dnodeId = 1;
mmBuildOptionForDeploy(pMgmt, pInput, &option); mmBuildOptionForDeploy(pMgmt, pInput, &option);
} else { } else {
dInfo("mnode start to open"); dInfo("mnode start to open");
...@@ -178,7 +178,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -178,7 +178,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
} }
} }
pInput->pData->dnodeId = pMgmt->dnodeId; pInput->pData->dnodeId = pMgmt->pData->dnodeId;
pOutput->pMgmt = pMgmt; pOutput->pMgmt = pMgmt;
return 0; return 0;
} }
......
...@@ -25,11 +25,11 @@ extern "C" { ...@@ -25,11 +25,11 @@ extern "C" {
#endif #endif
typedef struct SQnodeMgmt { typedef struct SQnodeMgmt {
SDnodeData *pData;
SQnode *pQnode; SQnode *pQnode;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
int32_t dnodeId;
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker fetchWorker; SSingleWorker fetchWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
......
...@@ -76,7 +76,7 @@ int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -76,7 +76,7 @@ int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr()); dError("failed to drop qnode since %s", terrstr());
return -1; return -1;
......
...@@ -39,9 +39,9 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -39,9 +39,9 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return -1; return -1;
} }
pMgmt->pData = pInput->pData;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue; pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue;
......
...@@ -25,11 +25,11 @@ extern "C" { ...@@ -25,11 +25,11 @@ extern "C" {
#endif #endif
typedef struct SSnodeMgmt { typedef struct SSnodeMgmt {
SDnodeData *pData;
SSnode *pSnode; SSnode *pSnode;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
int32_t dnodeId;
SRWLatch latch; SRWLatch latch;
int8_t uniqueWorkerInUse; int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SMultiWorker*> SArray *uniqueWorkers; // SArray<SMultiWorker*>
......
...@@ -76,7 +76,7 @@ int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -76,7 +76,7 @@ int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->dnodeId != 0 && dropReq.dnodeId != pMgmt->dnodeId) { if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr()); dError("failed to drop snode since %s", terrstr());
return -1; return -1;
......
...@@ -40,9 +40,9 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -40,9 +40,9 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return -1; return -1;
} }
pMgmt->pData = pInput->pData;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.pMgmt = pMgmt;
......
...@@ -26,10 +26,10 @@ extern "C" { ...@@ -26,10 +26,10 @@ extern "C" {
#endif #endif
typedef struct SVnodeMgmt { typedef struct SVnodeMgmt {
SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
int32_t dnodeId;
SQWorkerPool queryPool; SQWorkerPool queryPool;
SQWorkerPool fetchPool; SQWorkerPool fetchPool;
SWWorkerPool syncPool; SWWorkerPool syncPool;
......
...@@ -247,9 +247,9 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -247,9 +247,9 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt)); SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
if (pMgmt == NULL) goto _OVER; if (pMgmt == NULL) goto _OVER;
pMgmt->pData = pInput->pData;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue;
......
...@@ -65,7 +65,7 @@ typedef struct { ...@@ -65,7 +65,7 @@ typedef struct {
} SProc; } SProc;
typedef struct SMgmtWrapper { typedef struct SMgmtWrapper {
SDnode *pDnode; struct SDnode *pDnode;
SMgmtFunc func; SMgmtFunc func;
void *pMgmt; void *pMgmt;
const char *name; const char *name;
...@@ -82,12 +82,12 @@ typedef struct SMgmtWrapper { ...@@ -82,12 +82,12 @@ typedef struct SMgmtWrapper {
typedef struct { typedef struct {
EDndNodeType defaultNtype; EDndNodeType defaultNtype;
bool needCheckVgId; bool needCheckVgId;
} SMsgHandle; } SDnodeHandle;
typedef struct { typedef struct {
void *serverRpc; void *serverRpc;
void *clientRpc; void *clientRpc;
SMsgHandle msgHandles[TDMT_MAX]; SDnodeHandle msgHandles[TDMT_MAX];
} SDnodeTrans; } SDnodeTrans;
typedef struct { typedef struct {
...@@ -110,9 +110,10 @@ typedef struct SUdfdData { ...@@ -110,9 +110,10 @@ typedef struct SUdfdData {
} SUdfdData; } SUdfdData;
typedef struct SDnode { typedef struct SDnode {
int8_t once;
bool stop;
EDndProcType ptype; EDndProcType ptype;
EDndNodeType rtype; EDndNodeType rtype;
EDndEvent event;
EDndRunStatus status; EDndRunStatus status;
SStartupInfo startup; SStartupInfo startup;
SDnodeTrans trans; SDnodeTrans trans;
...@@ -123,23 +124,26 @@ typedef struct SDnode { ...@@ -123,23 +124,26 @@ typedef struct SDnode {
SMgmtWrapper wrappers[NODE_END]; SMgmtWrapper wrappers[NODE_END];
} SDnode; } SDnode;
// dmExec.c // dmEmv.c
int32_t dmOpenNode(SMgmtWrapper *pWrapper); void dmReportStartup(const char *pName, const char *pDesc);
void dmCloseNode(SMgmtWrapper *pWrapper);
// dmObj.c // dmMgmt.c
int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype);
void dmCleanupDnode(SDnode *pDnode);
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType); SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper); int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
void dmReleaseWrapper(SMgmtWrapper *pWrapper); void dmReleaseWrapper(SMgmtWrapper *pWrapper);
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper); SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
void dmSetEvent(SDnode *pDnode, EDndEvent event);
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc);
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
// dmNodes.c
int32_t dmOpenNode(SMgmtWrapper *pWrapper);
void dmCloseNode(SMgmtWrapper *pWrapper);
int32_t dmRunDnode(SDnode *pDnode);
// dmProc.c // dmProc.c
int32_t dmInitProc(struct SMgmtWrapper *pWrapper); int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
void dmCleanupProc(struct SMgmtWrapper *pWrapper); void dmCleanupProc(struct SMgmtWrapper *pWrapper);
......
...@@ -16,23 +16,10 @@ ...@@ -16,23 +16,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
static struct { static SDnode global = {0};
int8_t once;
EDndProcType ptype; static int32_t dmCheckRepeatInit(SDnode *pDnode) {
EDndNodeType rtype; if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
EDndEvent event;
EDndRunStatus status;
SStartupInfo startup;
SDnodeTrans trans;
SUdfdData udfdData;
TdThreadMutex mutex;
TdFilePtr lockfile;
SDnodeData data;
SMgmtWrapper wrappers[NODE_END];
} global;
static int32_t dmCheckRepeatInit() {
if (atomic_val_compare_exchange_8(&global.once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
dError("env is already initialized"); dError("env is already initialized");
terrno = TSDB_CODE_REPEAT_INIT; terrno = TSDB_CODE_REPEAT_INIT;
return -1; return -1;
...@@ -60,19 +47,19 @@ static int32_t dmInitMonitor() { ...@@ -60,19 +47,19 @@ static int32_t dmInitMonitor() {
return 0; return 0;
} }
int32_t dmInit() { int32_t dmInit(int8_t rtype) {
dInfo("start to init env"); dInfo("start to init env");
if (dmCheckRepeatInit() != 0) return -1; if (dmCheckRepeatInit(&global) != 0) return -1;
if (dmInitSystem() != 0) return -1; if (dmInitSystem() != 0) return -1;
if (dmInitMonitor() != 0) return -1; if (dmInitMonitor() != 0) return -1;
// if (dmInit) if (dmInitDnode(&global, rtype) != 0) return -1;
dInfo("env is initialized"); dInfo("env is initialized");
return 0; return 0;
} }
static int32_t dmCheckRepeatCleanup() { static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
if (atomic_val_compare_exchange_8(&global.once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("env is already cleaned up"); dError("env is already cleaned up");
return -1; return -1;
} }
...@@ -82,7 +69,7 @@ static int32_t dmCheckRepeatCleanup() { ...@@ -82,7 +69,7 @@ static int32_t dmCheckRepeatCleanup() {
void dmCleanup() { void dmCleanup() {
dDebug("start to cleanup env"); dDebug("start to cleanup env");
if (dmCheckRepeatCleanup != 0) return; if (dmCheckRepeatCleanup != 0) return;
dmCleanupDnode(&global);
monCleanup(); monCleanup();
syncCleanUp(); syncCleanUp();
walCleanUp(); walCleanUp();
...@@ -90,4 +77,111 @@ void dmCleanup() { ...@@ -90,4 +77,111 @@ void dmCleanup() {
udfStopUdfd(); udfStopUdfd();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dInfo("env is cleaned up"); dInfo("env is cleaned up");
taosCloseLog();
taosCleanupCfg();
}
void dmStop() {
SDnode *pDnode = &global;
pDnode->stop = true;
}
int32_t dmRun() {
SDnode *pDnode = &global;
return dmRunDnode(pDnode);
}
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
SDnode *pDnode = &global;
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dmReleaseWrapper(pWrapper);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
dError("failed to create node since %s", terrstr());
return -1;
}
taosThreadMutexLock(&pDnode->mutex);
pWrapper = &pDnode->wrappers[ntype];
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
return -1;
}
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
if (code != 0) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been created", pWrapper->name);
(void)dmOpenNode(pWrapper);
pWrapper->required = true;
pWrapper->deployed = true;
pWrapper->proc.ptype = pDnode->ptype;
}
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
SDnode *pDnode = &global;
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
dError("failed to drop node since %s", terrstr());
return -1;
}
taosThreadMutexLock(&pDnode->mutex);
int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg);
if (code != 0) {
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been dropped", pWrapper->name);
pWrapper->required = false;
pWrapper->deployed = false;
}
dmReleaseWrapper(pWrapper);
if (code == 0) {
dmCloseNode(pWrapper);
taosRemoveDir(pWrapper->path);
}
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
static bool dmIsNodeRequired(EDndNodeType ntype) {
SDnode *pDnode = &global;
return pDnode->wrappers[ntype].required;
}
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
SMgmtInputOpt opt = {
.path = pWrapper->path,
.name = pWrapper->name,
.pData = &pWrapper->pDnode->data,
.processCreateNodeFp = dmProcessCreateNodeReq,
.processDropNodeFp = dmProcessDropNodeReq,
.isNodeRequiredFp = dmIsNodeRequired,
};
opt.msgCb = dmGetMsgcb(pWrapper);
return opt;
}
void dmReportStartup(const char *pName, const char *pDesc) {
SStartupInfo *pStartup = &global.startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
} }
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
static bool dmIsNodeRequired(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; }
static bool dmRequireNode(SMgmtWrapper *pWrapper) { static bool dmRequireNode(SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
...@@ -39,8 +37,8 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) { ...@@ -39,8 +37,8 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) {
return required; return required;
} }
static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) {
pDnode->rtype = pOption->ntype; pDnode->rtype = rtype;
if (tsMultiProcess == 0) { if (tsMultiProcess == 0) {
pDnode->ptype = DND_PROC_SINGLE; pDnode->ptype = DND_PROC_SINGLE;
...@@ -65,21 +63,6 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -65,21 +63,6 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pData->rebootTime = taosGetTimestampMs(); pData->rebootTime = taosGetTimestampMs();
pData->dropped = 0; pData->dropped = 0;
pData->stopped = 0; pData->stopped = 0;
pData->localEp = strdup(pOption->localEp);
pData->localFqdn = strdup(pOption->localFqdn);
pData->firstEp = strdup(pOption->firstEp);
pData->secondEp = strdup(pOption->secondEp);
pData->supportVnodes = pOption->numOfSupportVnodes;
pData->serverPort = pOption->serverPort;
pData->numOfDisks = pOption->numOfDisks;
pData->disks = pOption->disks;
pData->dataDir = strdup(pOption->dataDir);
if (pData->dataDir == NULL || pData->localEp == NULL || pData->localFqdn == NULL || pData->firstEp == NULL ||
pData->secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pData->dnodeHash == NULL) { if (pData->dnodeHash == NULL) {
...@@ -126,30 +109,17 @@ static void dmClearVars(SDnode *pDnode) { ...@@ -126,30 +109,17 @@ static void dmClearVars(SDnode *pDnode) {
} }
taosWUnLockLatch(&pData->latch); taosWUnLockLatch(&pData->latch);
taosMemoryFreeClear(pData->localEp);
taosMemoryFreeClear(pData->localFqdn);
taosMemoryFreeClear(pData->firstEp);
taosMemoryFreeClear(pData->secondEp);
taosMemoryFreeClear(pData->dataDir);
taosThreadMutexDestroy(&pDnode->mutex); taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
taosMemoryFree(pDnode); taosMemoryFree(pDnode);
} }
SDnode *dmCreate(const SDnodeOpt *pOption) { int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
dInfo("start to create dnode"); dInfo("start to create dnode");
int32_t code = -1; int32_t code = -1;
char path[PATH_MAX + 100] = {0}; char path[PATH_MAX + 100] = {0};
SDnode *pDnode = NULL;
pDnode = taosMemoryCalloc(1, sizeof(SDnode));
if (pDnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dmInitVars(pDnode, pOption) != 0) { if (dmInitVars(pDnode, rtype) != 0) {
goto _OVER; goto _OVER;
} }
...@@ -162,7 +132,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { ...@@ -162,7 +132,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
pWrapper->pDnode = pDnode;
pWrapper->name = dmNodeName(ntype); pWrapper->name = dmNodeName(ntype);
pWrapper->ntype = ntype; pWrapper->ntype = ntype;
pWrapper->proc.wrapper = pWrapper; pWrapper->proc.wrapper = pWrapper;
...@@ -174,7 +143,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { ...@@ -174,7 +143,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
} }
taosInitRWLatch(&pWrapper->latch); taosInitRWLatch(&pWrapper->latch);
snprintf(path, sizeof(path), "%s%s%s", pOption->dataDir, TD_DIRSEP, pWrapper->name); snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path); pWrapper->path = strdup(path);
if (pWrapper->path == NULL) { if (pWrapper->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -195,7 +164,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { ...@@ -195,7 +164,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
} }
if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) { if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) {
pDnode->lockfile = dmCheckRunning(pOption->dataDir); pDnode->lockfile = dmCheckRunning(tsDataDir);
if (pDnode->lockfile == NULL) { if (pDnode->lockfile == NULL) {
goto _OVER; goto _OVER;
} }
...@@ -210,7 +179,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { ...@@ -210,7 +179,7 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER; goto _OVER;
} }
dmReportStartup(pDnode, "dnode-transport", "initialized"); dmReportStartup("dnode-transport", "initialized");
dInfo("dnode is created, ptr:%p", pDnode); dInfo("dnode is created, ptr:%p", pDnode);
code = 0; code = 0;
...@@ -221,10 +190,10 @@ _OVER: ...@@ -221,10 +190,10 @@ _OVER:
dError("failed to create dnode since %s", terrstr()); dError("failed to create dnode since %s", terrstr());
} }
return pDnode; return code;
} }
void dmClose(SDnode *pDnode) { void dmCleanupDnode(SDnode *pDnode) {
if (pDnode == NULL) return; if (pDnode == NULL) return;
dmCleanupClient(pDnode); dmCleanupClient(pDnode);
...@@ -240,12 +209,6 @@ void dmSetStatus(SDnode *pDnode, EDndRunStatus status) { ...@@ -240,12 +209,6 @@ void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
} }
} }
void dmSetEvent(SDnode *pDnode, EDndEvent event) {
if (event == DND_EVENT_STOP) {
pDnode->event = event;
}
}
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper; SMgmtWrapper *pRetWrapper = pWrapper;
...@@ -288,17 +251,6 @@ void dmReleaseWrapper(SMgmtWrapper *pWrapper) { ...@@ -288,17 +251,6 @@ void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount); dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
} }
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupInfo *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
}
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) {
dmReportStartup(pWrapper->pDnode, pName, pDesc);
}
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt; SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
pStatus->details[0] = 0; pStatus->details[0] = 0;
...@@ -315,7 +267,7 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) ...@@ -315,7 +267,7 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus)
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("net test req is received"); dDebug("net test req is received");
SRpcMsg rsp = {.info = pReq->info, .code = 0}; SRpcMsg rsp = {.code = 0, .info = pReq->info};
rsp.pCont = rpcMallocCont(pReq->contLen); rsp.pCont = rpcMallocCont(pReq->contLen);
if (rsp.pCont == NULL) { if (rsp.pCont == NULL) {
rsp.code = TSDB_CODE_OUT_OF_MEMORY; rsp.code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -353,82 +305,3 @@ _OVER: ...@@ -353,82 +305,3 @@ _OVER:
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pReq->pCont); rpcFreeCont(pReq->pCont);
} }
static int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dmReleaseWrapper(pWrapper);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
dError("failed to create node since %s", terrstr());
return -1;
}
taosThreadMutexLock(&pDnode->mutex);
pWrapper = &pDnode->wrappers[ntype];
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
return -1;
}
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
if (code != 0) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been created", pWrapper->name);
(void)dmOpenNode(pWrapper);
pWrapper->required = true;
pWrapper->deployed = true;
pWrapper->proc.ptype = pDnode->ptype;
}
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
static int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
dError("failed to drop node since %s", terrstr());
return -1;
}
taosThreadMutexLock(&pDnode->mutex);
int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg);
if (code != 0) {
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been dropped", pWrapper->name);
pWrapper->required = false;
pWrapper->deployed = false;
}
dmReleaseWrapper(pWrapper);
if (code == 0) {
dmCloseNode(pWrapper);
taosRemoveDir(pWrapper->path);
}
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
SMgmtInputOpt opt = {
.pDnode = pWrapper->pDnode,
.pData = &pWrapper->pDnode->data,
.processCreateNodeFp = dmProcessCreateNodeReq,
.processDropNodeFp = dmProcessDropNodeReq,
.isNodeRequiredFp = dmIsNodeRequired,
.name = pWrapper->name,
.path = pWrapper->path,
};
opt.msgCb = dmGetMsgcb(pWrapper);
return opt;
}
\ No newline at end of file
...@@ -136,7 +136,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { ...@@ -136,7 +136,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
pWrapper->pMgmt = output.pMgmt; pWrapper->pMgmt = output.pMgmt;
} }
dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned"); dmReportStartup(pWrapper->name, "openned");
return 0; return 0;
} }
...@@ -151,7 +151,7 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) { ...@@ -151,7 +151,7 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, has been started", pWrapper->name); dDebug("node:%s, has been started", pWrapper->name);
} }
dmReportStartup(pWrapper->pDnode, pWrapper->name, "started"); dmReportStartup(pWrapper->name, "started");
return 0; return 0;
} }
...@@ -221,7 +221,7 @@ static int32_t dmStartNodes(SDnode *pDnode) { ...@@ -221,7 +221,7 @@ static int32_t dmStartNodes(SDnode *pDnode) {
} }
dInfo("TDengine initialized successfully"); dInfo("TDengine initialized successfully");
dmReportStartup(pDnode, "TDengine", "initialized successfully"); dmReportStartup("TDengine", "initialized successfully");
return 0; return 0;
} }
...@@ -260,7 +260,7 @@ static void dmWatchNodes(SDnode *pDnode) { ...@@ -260,7 +260,7 @@ static void dmWatchNodes(SDnode *pDnode) {
taosThreadMutexUnlock(&pDnode->mutex); taosThreadMutexUnlock(&pDnode->mutex);
} }
int32_t dmRun(SDnode *pDnode) { int32_t dnRunDnode(SDnode *pDnode) {
if (dmOpenNodes(pDnode) != 0) { if (dmOpenNodes(pDnode) != 0) {
dError("failed to open nodes since %s", terrstr()); dError("failed to open nodes since %s", terrstr());
return -1; return -1;
...@@ -272,7 +272,7 @@ int32_t dmRun(SDnode *pDnode) { ...@@ -272,7 +272,7 @@ int32_t dmRun(SDnode *pDnode) {
} }
while (1) { while (1) {
if (pDnode->event & DND_EVENT_STOP) { if (!pDnode->stop) {
dInfo("dnode is about to stop"); dInfo("dnode is about to stop");
dmSetStatus(pDnode, DND_STAT_STOPPED); dmSetStatus(pDnode, DND_STAT_STOPPED);
dmStopNodes(pDnode); dmStopNodes(pDnode);
......
...@@ -52,7 +52,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -52,7 +52,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t code = -1; int32_t code = -1;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
bool needRelease = false; bool needRelease = false;
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
SMgmtWrapper *pWrapper = NULL; SMgmtWrapper *pWrapper = NULL;
dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
...@@ -178,7 +178,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -178,7 +178,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i); SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) { if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId; pHandle->needCheckVgId = pMgmt->needCheckVgId;
} }
...@@ -201,7 +201,7 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { ...@@ -201,7 +201,7 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) { if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
epSet.inUse = (i + 1) % epSet.numOfEps; epSet.inUse = (i + 1) % epSet.numOfEps;
} }
...@@ -410,8 +410,8 @@ int32_t dmInitServer(SDnode *pDnode) { ...@@ -410,8 +410,8 @@ int32_t dmInitServer(SDnode *pDnode) {
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn)); strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
rpcInit.localPort = pDnode->data.serverPort; rpcInit.localPort = tsServerPort;
rpcInit.label = "DND"; rpcInit.label = "DND";
rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
...@@ -449,7 +449,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { ...@@ -449,7 +449,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.sendRedirectRspFp = dmSendRedirectRsp, .sendRedirectRspFp = dmSendRedirectRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle, .releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartupByWrapper, .reportStartupFp = dmReportStartup,
}; };
return msgCb; return msgCb;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册