提交 e64422b6 编写于 作者: S Shengliang Guan

shm

上级 ca467fd7
......@@ -5,6 +5,7 @@ aux_source_directory(bnode/src DNODE_SRC)
aux_source_directory(snode/src DNODE_SRC)
aux_source_directory(vnode/src DNODE_SRC)
aux_source_directory(mnode/src DNODE_SRC)
aux_source_directory(container/src DNODE_SRC)
add_library(dnode STATIC ${DNODE_SRC})
target_link_libraries(
......@@ -19,6 +20,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/snode/inc"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/vnode/inc"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/mnode/inc"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/container/inc"
)
add_subdirectory(exec)
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
// #include "dndBnode.h"
// #include "dndMgmt.h"
// #include "dmMgmt.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
......
......@@ -69,8 +69,7 @@ typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg
typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg);
typedef SMgmtWrapper *(*OpenNodeFp)(SDnode *pDnode, const char *path);
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
typedef void (*CloseNodeFp)(SDnode *pDnode, SMgmtWrapper *pWrapper);
typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper);
typedef int32_t (*MgmtHandleMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg);
......@@ -254,7 +253,7 @@ SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) ;
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
SMgmtFp dndGetMgmtFp();
SMgmtFp dmGetMgmtFp();
#ifdef __cplusplus
}
......
......@@ -24,6 +24,7 @@ extern "C" {
int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode);
int32_t dndInitServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode);
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "dndHandle.h"
#include "dmHandle.h"
#include "dndTransport.h"
#include "vmInt.h"
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "dndMain.h"
#include "dndMgmt.h"
#include "dmMgmt.h"
#include "dndTransport.h"
#include "bmInt.h"
......@@ -57,36 +57,9 @@ static void dndClearMemory(SDnode *pDnode) {
}
static int32_t dndInitResource(SDnode *pDnode) {
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
SDiskCfg *pDisks = pDnode->cfg.pDisks;
int32_t numOfDisks = pDnode->cfg.numOfDisks;
if (numOfDisks <= 0 || pDisks == NULL) {
pDisks = &dCfg;
numOfDisks = 1;
}
pDnode->pTfs = tfsOpen(pDisks, numOfDisks);
if (pDnode->pTfs == NULL) {
dError("failed to init tfs since %s", terrstr());
return -1;
}
if (dndInitMgmt(pDnode) != 0) {
dError("failed to init mgmt since %s", terrstr());
return -1;
}
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
dndSetStatus(pDnode, DND_STAT_RUNNING);
dndSendStatusReq(pDnode);
dndReportStartup(pDnode, "TDengine", "initialized successfully");
return 0;
}
......@@ -120,7 +93,7 @@ SDnode *dndCreate(SDndCfg *pCfg) {
goto _OVER;
}
pDnode->wrappers[DNODE].fp = dndGetMgmtFp();
pDnode->wrappers[DNODE].fp = dmGetMgmtFp();
pDnode->wrappers[MNODE].fp = mmGetMgmtFp();
pDnode->wrappers[VNODES].fp = vmGetMgmtFp();
pDnode->wrappers[QNODE].fp = qmGetMgmtFp();
......@@ -216,7 +189,7 @@ static int32_t dndOpenNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
// return 0;
pWrapper->pMgmt = (*pWrapper->fp.openFp)(pDnode, pWrapper->path);
(*pWrapper->fp.openFp)(pWrapper);
return 0;
}
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "dndMonitor.h"
#include "dndMgmt.h"
#include "dmMgmt.h"
static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "dndTransport.h"
#include "dndMgmt.h"
#include "dmMgmt.h"
#include "mmInt.h"
#define INTERNAL_USER "_dnd"
......@@ -207,7 +207,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return rpcRsp.code;
}
static int32_t dndInitServer(SDnode *pDnode) {
int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t dndReadFile(SDnode *pDnode);
int32_t dndWriteFile(SDnode *pDnode);
int32_t dmReadFile(SDnode *pDnode);
int32_t dmWriteFile(SDnode *pDnode);
void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps);
void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps);
......
......@@ -23,7 +23,11 @@ extern "C" {
#endif
void dndInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void dndSendStatusReq(SDnode *pDnode);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_INT_H_
#define _TD_DND_DNODE_INT_H_
#include "dndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
SMgmtFp dmGetMgmtFp();
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_INT_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_WORKER_H_
#define _TD_DND_DNODE_WORKER_H_
#include "dmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t dmStartWorker();
void dmStopWorker();
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_WORKER_H_*/
\ No newline at end of file
......@@ -14,9 +14,9 @@
*/
#define _DEFAULT_SOURCE
#include "dndFile.h"
#include "dmFile.h"
int32_t dndReadFile(SDnode *pDnode) {
int32_t dmReadFile(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
......@@ -153,7 +153,7 @@ PRASE_DNODE_OVER:
return 0;
}
int32_t dndWriteFile(SDnode *pDnode) {
int32_t dmWriteFile(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
char file[PATH_MAX];
......@@ -220,12 +220,12 @@ void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) {
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
if (numOfEps != numOfEpsOld) {
dndResetDnodes(pDnode, pDnodeEps);
dndWriteFile(pDnode);
dmWriteFile(pDnode);
} else {
int32_t size = numOfEps * sizeof(SDnodeEp);
if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) {
dndResetDnodes(pDnode, pDnodeEps);
dndWriteFile(pDnode);
dmWriteFile(pDnode);
}
}
......
......@@ -14,9 +14,9 @@
*/
#define _DEFAULT_SOURCE
#include "dndHandle.h"
#include "dmHandle.h"
#include "dndWorker.h"
#include "dndMgmt.h"
#include "dmMgmt.h"
static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
......@@ -52,7 +52,7 @@ void dndInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dndProcessMgmtMsg);
}
SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
......@@ -14,9 +14,11 @@
*/
#define _DEFAULT_SOURCE
#include "dndMgmt.h"
#include "dndHandle.h"
#include "dmMgmt.h"
#include "dmWorker.h"
// #include "dmMgmt.h"
#include "dmFile.h"
#include "dmHandle.h"
#include "dndMonitor.h"
// #include "dndBnode.h"
// #include "mm.h"
......@@ -30,9 +32,6 @@
#if 0
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndReadFile(SDnode *pDnode);
static int32_t dndWriteFile(SDnode *pDnode);
static void *dnodeThreadRoutine(void *param);
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq);
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
......@@ -164,7 +163,7 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
dndWriteFile(pDnode);
dmWriteFile(pDnode);
taosWUnLockLatch(&pMgmt->latch);
}
}
......@@ -176,7 +175,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1;
dndWriteFile(pDnode);
dmWriteFile(pDnode);
}
} else {
SStatusRsp statusRsp = {0};
......@@ -216,84 +215,6 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
rpcSendResponse(&rpcRsp);
}
static void *dnodeThreadRoutine(void *param) {
SDnode *pDnode = param;
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int64_t lastStatusTime = taosGetTimestampMs();
int64_t lastMonitorTime = lastStatusTime;
setThreadName("dnode-hb");
while (true) {
pthread_testcancel();
taosMsleep(200);
if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) {
continue;
}
int64_t curTime = taosGetTimestampMs();
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
dndSendStatusReq(pDnode);
lastStatusTime = curTime;
}
float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
if (monitorInterval >= tsMonitorInterval) {
dndSendMonitorReport(pDnode);
lastMonitorTime = curTime;
}
}
}
int32_t dndInitMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->dnodeId = 0;
pMgmt->rebootTime = taosGetTimestampMs();
pMgmt->dropped = 0;
pMgmt->clusterId = 0;
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndReadFile(pDnode) != 0) {
dError("failed to read file:%s since %s", pMgmt->file, terrstr());
return -1;
}
if (pMgmt->dropped) {
dError("dnode not start since its already dropped");
return -1;
}
if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
if (pMgmt->threadId == NULL) {
dError("failed to init dnode thread");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dInfo("dnode-mgmt is initialized");
return 0;
}
void dndStopMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupWorker(&pMgmt->mgmtWorker);
......@@ -431,7 +352,6 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t dndInitMgmt(SDnode *pDnode) {return 0;}
void dndStopMgmt(SDnode *pDnode) {}
void dndCleanupMgmt(SDnode *pDnode){}
......@@ -446,13 +366,80 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq){}
void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
bool dndRequireNode(SMgmtWrapper *pWrapper) { return true; }
static int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
pMgmt->dnodeId = 0;
pMgmt->rebootTime = taosGetTimestampMs();
pMgmt->dropped = 0;
pMgmt->clusterId = 0;
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) {
dError("node:%s, failed to init dnode hash", pWrapper->name);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadFile(pWrapper->pDnode) != 0) {
dError("node:%s, failed to read file since %s", pWrapper->name, terrstr());
return -1;
}
if (pMgmt->dropped) {
dError("node:%s, will not start since its already dropped", pWrapper->name);
return -1;
}
if (dmStartWorker(pMgmt) != 0) {
dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr());
return -1;
}
dInfo("dnode-mgmt is initialized");
return 0;
// dndSetStatus(pDnode, DND_STAT_RUNNING);
// dndSendStatusReq(pDnode);
// dndReportStartup(pDnode, "TDengine", "initialized successfully");
#if 0
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
SDiskCfg *pDisks = pDnode->cfg.pDisks;
int32_t numOfDisks = pDnode->cfg.numOfDisks;
if (numOfDisks <= 0 || pDisks == NULL) {
pDisks = &dCfg;
numOfDisks = 1;
}
pDnode->pTfs = tfsOpen(pDisks, numOfDisks);
if (pDnode->pTfs == NULL) {
dError("failed to init tfs since %s", terrstr());
return -1;
}
#endif
}
static void dmCleanup(SDnode *pDnode, SMgmtWrapper *pWrapper){
}
static bool dmRequire(SMgmtWrapper *pWrapper) { return true; }
SMgmtFp dndGetMgmtFp() {
SMgmtFp dmGetMgmtFp() {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = dndRequireNode;
mgmtFp.getMsgHandleFp = dndGetMsgHandle;
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
mgmtFp.requiredFp = dmRequire;
mgmtFp.getMsgHandleFp = dmGetMsgHandle;
return mgmtFp;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmWorker.h"
#include "dndWorker.h"
#include "dmHandle.h"
static void *dnodeThreadRoutine(void *param) {
SDnode *pDnode = param;
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int64_t lastStatusTime = taosGetTimestampMs();
int64_t lastMonitorTime = lastStatusTime;
setThreadName("dnode-hb");
while (true) {
pthread_testcancel();
taosMsleep(200);
if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) {
continue;
}
int64_t curTime = taosGetTimestampMs();
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
dndSendStatusReq(pDnode);
lastStatusTime = curTime;
}
// float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
// if (monitorInterval >= tsMonitorInterval) {
// dndSendMonitorReport(pDnode);
// lastMonitorTime = curTime;
// }
}
}
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
#if 0
switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE:
code = mmProcessCreateMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_MNODE:
code = mmProcessAlterMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_MNODE:
code = mmProcessDropMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_CREATE_QNODE:
code = dndProcessCreateQnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_QNODE:
code = dndProcessDropQnodeReq(pDnode, pMsg);
break;
case TDMT_DND_CREATE_SNODE:
code = dndProcessCreateSnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_SNODE:
code = dndProcessDropSnodeReq(pDnode, pMsg);
break;
case TDMT_DND_CREATE_BNODE:
code = dndProcessCreateBnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_BNODE:
code = dndProcessDropBnodeReq(pDnode, pMsg);
break;
case TDMT_DND_CONFIG_DNODE:
code = dndProcessConfigDnodeReq(pDnode, pMsg);
break;
case TDMT_MND_STATUS_RSP:
dndProcessStatusRsp(pDnode, pMsg);
break;
case TDMT_MND_AUTH_RSP:
dndProcessAuthRsp(pDnode, pMsg);
break;
case TDMT_MND_GRANT_RSP:
dndProcessGrantRsp(pDnode, pMsg);
break;
case TDMT_DND_CREATE_VNODE:
code = dndProcessCreateVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = dndProcessAlterVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = dndProcessDropVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = dndProcessSyncVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = dndProcessCompactVnodeReq(pDnode, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
break;
}
#endif
if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
taosFreeQitem(pMsg);
}
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
if (dndInitWorker(NULL, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
if (dndInitWorker(NULL, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
// pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
// if (pMgmt->threadId == NULL) {
// dError("failed to init dnode thread");
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return -1;
// }
return 0;
}
void dmStopWorker(SDnodeMgmt *pMgmt) {
#if 0
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 1) {
taosMsleep(10);
}
dndCleanupWorker(&pMgmt->readWorker);
dndCleanupWorker(&pMgmt->writeWorker);
dndCleanupWorker(&pMgmt->syncWorker);
#endif
}
......@@ -18,7 +18,7 @@
#include "mmWorker.h"
#if 0
#include "dndMgmt.h"
#include "dmMgmt.h"
int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateMnodeReq createReq = {0};
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dndMgmt.h"
#include "dmMgmt.h"
#include "dndTransport.h"
#if 0
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dndMgmt.h"
#include "dmMgmt.h"
#include "dndTransport.h"
#include "dndWorker.h"
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
// #include "dndQnode.h"
// #include "dndMgmt.h"
// #include "dmMgmt.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
// #include "dndSnode.h"
// #include "dndMgmt.h"
// #include "dmMgmt.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "vmMgmt.h"
#include "dndMgmt.h"
#include "dmMgmt.h"
#include "dndTransport.h"
// #include "sync.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册