提交 352174a5 编写于 作者: S Shengliang Guan

shm

上级 04af6494
...@@ -13,13 +13,14 @@ ...@@ -13,13 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DND_BNODE_H_ #ifndef _TD_DND_BNODE_INT_H_
#define _TD_DND_BNODE_H_ #define _TD_DND_BNODE_INT_H_
#include "dndInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h"
SMgmtFp bmGetMgmtFp(); SMgmtFp bmGetMgmtFp();
...@@ -34,4 +35,4 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); ...@@ -34,4 +35,4 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
} }
#endif #endif
#endif /*_TD_DND_BNODE_H_*/ #endif /*_TD_DND_BNODE_INT_H_*/
\ No newline at end of file \ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_BNODE_WORKER_H_
#define _TD_DND_BNODE_WORKER_H_
#include "bmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t bmStartWorker(SDnode *pDnode);
void bmStopWorker(SDnode *pDnode);
void bmInitMsgFp(SMnodeMgmt *pMgmt);
void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void bmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_BNODE_WORKER_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_HADNLE_H_
#define _TD_DND_HADNLE_H_
#include "dndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
void dndInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_HADNLE_H_*/
\ No newline at end of file
...@@ -55,7 +55,7 @@ extern "C" { ...@@ -55,7 +55,7 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
typedef enum { MNODE, NODE_MAX, VNODES, QNODE, SNODE, BNODE } ENodeType; typedef enum { DNODE, MNODE, NODE_MAX, VNODES, QNODE, SNODE, BNODE } ENodeType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
...@@ -111,6 +111,9 @@ typedef struct { ...@@ -111,6 +111,9 @@ typedef struct {
SRWLatch latch; SRWLatch latch;
SDnodeWorker mgmtWorker; SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker; SDnodeWorker statusWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
...@@ -140,6 +143,11 @@ typedef struct { ...@@ -140,6 +143,11 @@ typedef struct {
SRWLatch latch; SRWLatch latch;
SDnodeWorker queryWorker; SDnodeWorker queryWorker;
SDnodeWorker fetchWorker; SDnodeWorker fetchWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SQnodeMgmt; } SQnodeMgmt;
typedef struct { typedef struct {
...@@ -149,6 +157,11 @@ typedef struct { ...@@ -149,6 +157,11 @@ typedef struct {
SSnode *pSnode; SSnode *pSnode;
SRWLatch latch; SRWLatch latch;
SDnodeWorker writeWorker; SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SSnodeMgmt; } SSnodeMgmt;
typedef struct { typedef struct {
...@@ -169,6 +182,11 @@ typedef struct { ...@@ -169,6 +182,11 @@ typedef struct {
SBnode *pBnode; SBnode *pBnode;
SRWLatch latch; SRWLatch latch;
SDnodeWorker writeWorker; SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SBnodeMgmt; } SBnodeMgmt;
typedef struct { typedef struct {
...@@ -179,6 +197,11 @@ typedef struct { ...@@ -179,6 +197,11 @@ typedef struct {
SFWorkerPool fetchPool; SFWorkerPool fetchPool;
SWWorkerPool syncPool; SWWorkerPool syncPool;
SWWorkerPool writePool; SWWorkerPool writePool;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SVnodesMgmt; } SVnodesMgmt;
typedef struct { typedef struct {
......
...@@ -27,6 +27,11 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c ...@@ -27,6 +27,11 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
void dndCleanupWorker(SDnodeWorker *pWorker); void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
void dndProcessMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndHandle.h"
#include "dndWorker.h"
static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
void dndInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dndProcessMgmtMsg);
// Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dndProcessMgmtMsg);
}
SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
...@@ -28,135 +28,6 @@ ...@@ -28,135 +28,6 @@
#define INTERNAL_CKEY "_key" #define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
#if 0
static void dndInitMsgFp(STransMgmt *pMgmt) {
// Requests handled by DNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg;
// Requests handled by MNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_QNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_QNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_SNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_SNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_BNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_BNODE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TABLE_META)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = mmProcessRpcMsg;
// Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH_RSP)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessRpcMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
}
#endif
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent; SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DND_MNODE_MGMT_H_ #ifndef _TD_DND_MNODE_INT_H_
#define _TD_DND_MNODE_MGMT_H_ #define _TD_DND_MNODE_INT_H_
#include "dndInt.h" #include "dndInt.h"
...@@ -41,4 +41,4 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro ...@@ -41,4 +41,4 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro
} }
#endif #endif
#endif /*_TD_DND_MNODE_MGMT_H_*/ #endif /*_TD_DND_MNODE_INT_H_*/
\ No newline at end of file \ No newline at end of file
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DND_QNODE_H_ #ifndef _TD_DND_QNODE_INT_H_
#define _TD_DND_QNODE_H_ #define _TD_DND_QNODE_INT_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -35,4 +35,4 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); ...@@ -35,4 +35,4 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
} }
#endif #endif
#endif /*_TD_DND_QNODE_H_*/ #endif /*_TD_DND_QNODE_INT_H_*/
\ No newline at end of file \ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_QNODE_WORKER_H_
#define _TD_DND_QNODE_WORKER_H_
#include "qmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t qmStartWorker(SDnode *pDnode);
void qmStopWorker(SDnode *pDnode);
void qmInitMsgFp(SMnodeMgmt *pMgmt);
void qmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t qmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void qmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void qmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void qmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void qmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void qmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_QNODE_WORKER_H_*/
\ No newline at end of file
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DND_SNODE_H_ #ifndef _TD_DND_SNODE_INT_H_
#define _TD_DND_SNODE_H_ #define _TD_DND_SNODE_INT_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -34,4 +34,4 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); ...@@ -34,4 +34,4 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
} }
#endif #endif
#endif /*_TD_DND_SNODE_H_*/ #endif /*_TD_DND_SNODE_INT_H_*/
\ No newline at end of file \ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_SNODE_WORKER_H_
#define _TD_DND_SNODE_WORKER_H_
#include "smInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t smStartWorker(SDnode *pDnode);
void smStopWorker(SDnode *pDnode);
void smInitMsgFp(SMnodeMgmt *pMgmt);
void smProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t smPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t smPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void smConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void smConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void smProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void smProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_SNODE_WORKER_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODE_HANDLE_H_
#define _TD_DND_VNODE_HANDLE_H_
#include "vmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
void vmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODE_HANDLE_H_*/
\ No newline at end of file
...@@ -13,17 +13,17 @@ ...@@ -13,17 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DND_VNODES_H_ #ifndef _TD_DND_VNODE_INT_H_
#define _TD_DND_VNODES_H_ #define _TD_DND_VNODE_INT_H_
#include "dndInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h"
SMgmtFp vmGetMgmtFp() ; SMgmtFp vmGetMgmtFp() ;
int32_t dndInitVnodes(SDnode *pDnode); int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads); void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads);
...@@ -45,4 +45,4 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq); ...@@ -45,4 +45,4 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq);
} }
#endif #endif
#endif /*_TD_DND_VNODES_H_*/ #endif /*_TD_DND_VNODE_INT_H_*/
\ No newline at end of file \ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODE_WORKER_H_
#define _TD_DND_VNODE_WORKER_H_
#include "vmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t vmStartWorker(SDnode *pDnode);
void vmStopWorker(SDnode *pDnode);
void vmInitMsgFp(SMnodeMgmt *pMgmt);
void vmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t vmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void vmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void vmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessQueryMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessFetchMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODE_WORKER_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vmHandle.h"
#include "vmWorker.h"
static void vmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by VNODE
vmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg);
vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg);
}
SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册