提交 3487d4f8 编写于 作者: S Shengliang Guan

shm

上级 a50e8233
......@@ -43,6 +43,22 @@ void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t bmStartWorker(SDnode *pDnode);
void bmStopWorker(SDnode *pDnode);
void bmInitMsgFp(SMnodeMgmt *pMgmt);
void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_BNODE_HANDLE_H_
#define _TD_DND_BNODE_HANDLE_H_
#include "mm.h"
#ifdef __cplusplus
extern "C" {
#endif
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_BNODE_HANDLE_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_BNODE_WORKER_H_
#define _TD_DND_BNODE_WORKER_H_
#include "bmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t bmStartWorker(SDnode *pDnode);
void bmStopWorker(SDnode *pDnode);
void bmInitMsgFp(SMnodeMgmt *pMgmt);
void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_BNODE_WORKER_H_*/
\ No newline at end of file
......@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
#include "bmMsg.h"
bool bmRequireNode(SMgmtWrapper *pWrapper) { return false; }
......
......@@ -14,8 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "bmMsg.h"
#include "bmWorker.h"
#include "bmInt.h"
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;}
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;}
......
......@@ -18,6 +18,14 @@
#include "dnd.h"
#include "bmInt.h"
#include "dm.h"
#include "dndInt.h"
#include "mm.h"
#include "qmInt.h"
#include "smInt.h"
#include "vm.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -16,10 +16,6 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "dm.h"
#include "mm.h"
#include "vmInt.h"
static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
......
......@@ -16,13 +16,6 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "bmInt.h"
#include "dm.h"
#include "mm.h"
#include "qmInt.h"
#include "smInt.h"
#include "vmInt.h"
static void dndResetLog(SMgmtWrapper *pMgmt) {
char logname[24] = {0};
snprintf(logname, sizeof(logname), "%slog", pMgmt->name);
......
......@@ -16,9 +16,6 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "dm.h"
#include "mm.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
......
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "vmInt.h"
#include "vm.h"
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
......
......@@ -14,13 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "bmInt.h"
#include "dmInt.h"
#include "mm.h"
#include "qmInt.h"
#include "smInt.h"
#include "vmInt.h"
#include "vm.h"
static void *dmThreadRoutine(void *param) {
SDnodeMgmt *pMgmt = param;
......
......@@ -16,6 +16,7 @@
#ifndef _TD_DND_MNODE_INT_H_
#define _TD_DND_MNODE_INT_H_
#include "dm.h"
#include "mm.h"
#ifdef __cplusplus
......
......@@ -16,8 +16,6 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dm.h"
SMnode *mmAcquire(SMnodeMgmt *pMgmt) {
SMnode *pMnode = NULL;
int32_t refCount = 0;
......
......@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dm.h"
int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
......
......@@ -16,8 +16,6 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dm.h"
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed", pMsg);
SMnode *pMnode = mmAcquire(pMgmt);
......
......@@ -48,6 +48,24 @@ void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
void qmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmStartWorker(SDnode *pDnode);
void qmStopWorker(SDnode *pDnode);
void qmInitMsgFp(SMnodeMgmt *pMgmt);
void qmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t qmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void qmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void qmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void qmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void qmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void qmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_QNODE_HANDLE_H_
#define _TD_DND_QNODE_HANDLE_H_
#include "qmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
void qmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_QNODE_HANDLE_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_QNODE_WORKER_H_
#define _TD_DND_QNODE_WORKER_H_
#include "qmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t qmStartWorker(SDnode *pDnode);
void qmStopWorker(SDnode *pDnode);
void qmInitMsgFp(SMnodeMgmt *pMgmt);
void qmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t qmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void qmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void qmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void qmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void qmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void qmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_QNODE_WORKER_H_*/
\ No newline at end of file
......@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
#include "qmMsg.h"
bool qmRequireNode(SMgmtWrapper *pWrapper) { return false; }
......
......@@ -14,8 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "qmMsg.h"
#include "qmWorker.h"
#include "qmInt.h"
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;}
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg){return 0;}
......
......@@ -42,6 +42,22 @@ void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
void smInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t smStartWorker(SDnode *pDnode);
void smStopWorker(SDnode *pDnode);
void smInitMsgFp(SMnodeMgmt *pMgmt);
void smProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t smPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t smPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void smConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void smConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void smProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_SNODE_HANDLE_H_
#define _TD_DND_SNODE_HANDLE_H_
#include "smInt.h"
#ifdef __cplusplus
extern "C" {
#endif
void smInitMsgHandles(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_SNODE_HANDLE_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_SNODE_WORKER_H_
#define _TD_DND_SNODE_WORKER_H_
#include "smInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t smStartWorker(SDnode *pDnode);
void smStopWorker(SDnode *pDnode);
void smInitMsgFp(SMnodeMgmt *pMgmt);
void smProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t smPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t smPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void smConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void smConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void smProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_SNODE_WORKER_H_*/
\ No newline at end of file
......@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
#include "smMsg.h"
bool smRequireNode(SMgmtWrapper *pWrapper) { return false; }
......
......@@ -14,8 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "smMsg.h"
#include "smWorker.h"
#include "smInt.h"
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;}
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;}
......
......@@ -13,25 +13,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODE_HANDLE_H_
#define _TD_DND_VNODE_HANDLE_H_
#ifndef _TD_DND_VNODES_H_
#define _TD_DND_VNODES_H_
#include "vmInt.h"
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
void vmGetMgmtFp(SMgmtWrapper *pWrapper);
void vmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODE_HANDLE_H_*/
\ No newline at end of file
#endif /*_TD_DND_VNODES_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODES_FILE_H_
#define _TD_DND_VNODES_FILE_H_
#include "vmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt);
SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODES_FILE_H_*/
\ No newline at end of file
......@@ -16,23 +16,14 @@
#ifndef _TD_DND_VNODES_INT_H_
#define _TD_DND_VNODES_INT_H_
#include "dnd.h"
#include "dm.h"
#include "sync.h"
#include "vm.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
typedef struct SVnodesMgmt {
SHashObj *hash;
SRWLatch latch;
......@@ -85,18 +76,37 @@ typedef struct {
SWrapperCfg *pCfgs;
} SVnodeThread;
// interface
void vmGetMgmtFp(SMgmtWrapper *pWrapper);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
// vmInt.h
// vmInt.c
SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
// vmMsg.c
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
// vmFile.c
int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt);
SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes);
// vmWorker.c
int32_t vmStartWorker(SVnodesMgmt *pMgmt);
void vmStopWorker(SVnodesMgmt *pMgmt);
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg);
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODE_WORKER_H_
#define _TD_DND_VNODE_WORKER_H_
#include "vmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t vmStartWorker(SVnodesMgmt *pMgmt);
void vmStopWorker(SVnodesMgmt *pMgmt);
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToQueryQueue(SVnodesMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SVnodesMgmt *pMgmt, int32_t vgId, SRpcMsg *pMsg);
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODE_WORKER_H_*/
\ No newline at end of file
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "vmFile.h"
#include "vmInt.h"
SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) {
taosRLockLatch(&pMgmt->latch);
......
......@@ -14,11 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "vmFile.h"
#include "vmMsg.h"
#include "vmWorker.h"
#include "sync.h"
#include "vmInt.h"
SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL;
......
......@@ -14,10 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "vmMsg.h"
#include "vmFile.h"
#include "vmWorker.h"
#include "dm.h"
#include "vmInt.h"
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->vgId = pCreate->vgId;
......@@ -56,7 +53,7 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S
}
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SRpcMsg *pReq = &pMsg->rpcMsg;
SCreateVnodeReq createReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -115,7 +112,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SRpcMsg *pReq = &pMsg->rpcMsg;
SAlterVnodeReq alterReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -157,7 +154,7 @@ int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -188,7 +185,7 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SRpcMsg *pReq = &pMsg->rpcMsg;
SSyncVnodeReq syncReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
......@@ -212,7 +209,7 @@ int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SRpcMsg *pReq = &pMsg->rpcMsg;
SCompactVnodeReq compatcReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
......
......@@ -14,8 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "vmWorker.h"
#include "vmMsg.h"
#include "vmInt.h"
static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册