提交 745fa09b 编写于 作者: S Shengliang Guan

refact transport

上级 2eb30ee0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_BNODE_H_
#define _TD_DND_BNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
void bmGetMgmtFp(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_BNODE_H_*/
\ No newline at end of file
......@@ -16,7 +16,7 @@
#ifndef _TD_DND_BNODE_INT_H_
#define _TD_DND_BNODE_INT_H_
#include "bm.h"
#include "dnd.h"
#include "bnode.h"
#ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_H_
#define _TD_DND_DNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SDnodeMgmt SDnodeMgmt;
void dmGetMgmtFp(SMgmtWrapper *pWrapper);
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_H_*/
\ No newline at end of file
......@@ -16,7 +16,7 @@
#ifndef _TD_DND_DNODE_INT_H_
#define _TD_DND_DNODE_INT_H_
#include "dm.h"
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
......@@ -45,6 +45,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt);
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
// dmMsg.c
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeMgmt *pMgmt);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
......
......@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "vm.h"
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
......
......@@ -14,12 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "bm.h"
#include "dmInt.h"
#include "mm.h"
#include "qm.h"
#include "sm.h"
#include "vm.h"
static void *dmThreadRoutine(void *param) {
SDnodeMgmt *pMgmt = param;
......
......@@ -165,7 +165,37 @@ int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
int32_t dndInitMsgHandle(SDnode *pDnode);
// mgmt
void dmGetMgmtFp(SMgmtWrapper *pWrapper);
void bmGetMgmtFp(SMgmtWrapper *pWrapper);
void qmGetMgmtFp(SMgmtWrapper *pMgmt);
void smGetMgmtFp(SMgmtWrapper *pWrapper);
void vmGetMgmtFp(SMgmtWrapper *pWrapper);
void mmGetMgmtFp(SMgmtWrapper *pMgmt);
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -18,13 +18,6 @@
#include "dnd.h"
#include "bm.h"
#include "dm.h"
#include "mm.h"
#include "qm.h"
#include "sm.h"
#include "vm.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -47,9 +40,6 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndTransport.c
int32_t dndInitMsgHandle(SDnode *pDnode);
// dndFile.c
TdFilePtr dndCheckRunning(const char *dataDir);
int32_t dndReadShmFile(SDnode *pDnode);
......
......@@ -25,7 +25,7 @@ const char *dndStatStr(EDndStatus status) {
case DND_STAT_STOPPED:
return "stopped";
default:
return "unknown";
return "UNKNOWN";
}
}
......
......@@ -142,20 +142,14 @@ static void dndProcessRequest(SDnode *pDnode, SRpcMsg *pReq, SEpSet *pEpSet) {
}
}
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0};
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
dndReleaseWrapper(pWrapper);
}
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0};
SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE];
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
}
static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t code = 0;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
......@@ -183,21 +177,6 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch
return 0;
}
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
if (mmGetUserAuth(pWrapper, user, spi, encrypt, secret, ckey) == 0) {
dndReleaseWrapper(pWrapper);
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
}
dndReleaseWrapper(pWrapper);
}
if (terrno != TSDB_CODE_APP_NOT_READY) {
dTrace("failed to get user auth from mnode since %s", terrstr());
return -1;
}
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
......@@ -285,11 +264,9 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
int32_t vgId = pWrapper->msgVgIds[msgIndex];
int8_t vgId = pWrapper->msgVgIds[msgIndex];
if (msgFp == NULL) continue;
// dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (vgId == QND_VGID) {
if (pHandle->pQndWrapper != NULL) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_MNODE_H_
#define _TD_DND_MNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
void mmGetMgmtFp(SMgmtWrapper *pMgmt);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_MNODE_H_*/
\ No newline at end of file
......@@ -16,7 +16,7 @@
#ifndef _TD_DND_MNODE_INT_H_
#define _TD_DND_MNODE_INT_H_
#include "mm.h"
#include "dnd.h"
#include "mnode.h"
#ifdef __cplusplus
......
......@@ -241,14 +241,6 @@ void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper->fp = mgmtFp;
}
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t code = mndRetriveAuth(pMgmt->pMnode, user, spi, encrypt, secret, ckey);
dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt);
return code;
}
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_QNODE_H_
#define _TD_DND_QNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
void qmGetMgmtFp(SMgmtWrapper *pMgmt);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_QNODE_H_*/
\ No newline at end of file
......@@ -16,7 +16,7 @@
#ifndef _TD_DND_QNODE_INT_H_
#define _TD_DND_QNODE_INT_H_
#include "qm.h"
#include "dnd.h"
#include "qnode.h"
#ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_SNODE_H_
#define _TD_DND_SNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
void smGetMgmtFp(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_SNODE_H_*/
......@@ -16,7 +16,7 @@
#ifndef _TD_DND_SNODE_INT_H_
#define _TD_DND_SNODE_INT_H_
#include "sm.h"
#include "dnd.h"
#include "snode.h"
#ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODES_H_
#define _TD_DND_VNODES_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
void vmGetMgmtFp(SMgmtWrapper *pWrapper);
void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODES_H_*/
\ No newline at end of file
......@@ -17,7 +17,7 @@
#define _TD_DND_VNODES_INT_H_
#include "sync.h"
#include "vm.h"
#include "dnd.h"
#include "vnode.h"
#ifdef __cplusplus
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册