提交 fc692411 编写于 作者: J jtao1735

messages received are handled to different modules

上级 d58c42b4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "trpc.h"
#include "tutil.h"
#include "tglobal.h"
#include "dnode.h"
#include "dnodeLog.h"
#include "dnodeMgmt.h"
static void *tsDnodeClientRpc;
static void (*dnodeProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg);
extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
int32_t dnodeInitClient() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromDnode;
rpcInit.ufp = dnodeUpdateIpSet;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.user = "t";
rpcInit.ckey = "key";
rpcInit.secret = "secret";
tsDnodeClientRpc = rpcOpen(&rpcInit);
if (tsDnodeClientRpc == NULL) {
dError("failed to init mnode rpc client");
return -1;
}
dPrint("inter-dndoes rpc client is opened");
return 0;
}
void dnodeCleanupClient() {
if (tsDnodeClientRpc) {
rpcClose(tsDnodeClientRpc);
tsDnodeClientRpc = NULL;
dPrint("inter-dnodes rpc client is closed");
}
}
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) {
if (dnodeProcessDnodeRspFp[pMsg->msgType]) {
(*dnodeProcessDnodeRspFp[pMsg->msgType])(pMsg);
} else {
dError("%s is not processed", taosMsg[pMsg->msgType]);
}
rpcFreeCont(pMsg->pCont);
}
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
dnodeProcessDnodeRspFp[msgType] = fp;
}
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg);
}
...@@ -13,6 +13,12 @@ ...@@ -13,6 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
/* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here
*/
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -21,20 +27,31 @@ ...@@ -21,20 +27,31 @@
#include "dnodeLog.h" #include "dnodeLog.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "mnode.h"
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void *tsDnodeServerRpc = NULL; static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg);
static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL;
int32_t dnodeInitServer() { int32_t dnodeInitServer() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mgmtProcessReqMsgFromDnode;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mgmtProcessReqMsgFromDnode;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mgmtProcessReqMsgFromDnode;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mgmtProcessReqMsgFromDnode;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -64,8 +81,6 @@ void dnodeCleanupServer() { ...@@ -64,8 +81,6 @@ void dnodeCleanupServer() {
} }
} }
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) {
SRpcMsg rspMsg; SRpcMsg rspMsg;
rspMsg.handle = pMsg->handle; rspMsg.handle = pMsg->handle;
...@@ -76,7 +91,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { ...@@ -76,7 +91,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) {
rspMsg.code = TSDB_CODE_NOT_READY; rspMsg.code = TSDB_CODE_NOT_READY;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle); dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
return; return;
} }
...@@ -86,10 +101,65 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { ...@@ -86,10 +101,65 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) {
return; return;
} }
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessReqMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessReqMsgFp[pMsg->msgType])(pMsg);
} else { } else {
mgmtProcessReqMsgFromDnode(pMsg); rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
dTrace("RPC %p, message:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
return;
} }
} }
int32_t dnodeInitClient() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromDnode;
rpcInit.ufp = dnodeUpdateIpSet;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.user = "t";
rpcInit.ckey = "key";
rpcInit.secret = "secret";
tsDnodeClientRpc = rpcOpen(&rpcInit);
if (tsDnodeClientRpc == NULL) {
dError("failed to init mnode rpc client");
return -1;
}
dPrint("inter-dndoes rpc client is opened");
return 0;
}
void dnodeCleanupClient() {
if (tsDnodeClientRpc) {
rpcClose(tsDnodeClientRpc);
tsDnodeClientRpc = NULL;
dPrint("inter-dnodes rpc client is closed");
}
}
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) {
if (dnodeProcessRspMsgFp[pMsg->msgType]) {
(*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg);
} else {
dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
}
rpcFreeCont(pMsg->pCont);
}
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
dnodeProcessRspMsgFp[msgType] = fp;
}
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg);
}
...@@ -41,6 +41,37 @@ int32_t dnodeInitShell() { ...@@ -41,6 +41,37 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeRead;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mgmtProcessMsgFromShell;
// the following message shall be treated as mnode query
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mgmtProcessMsgFromShell;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= mgmtProcessMsgFromShell;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
...@@ -82,7 +113,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { ...@@ -82,7 +113,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
rpcMsg.contLen = 0; rpcMsg.contLen = 0;
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
dError("RPC %p, shell msg is ignored since dnode not running", pMsg->handle); dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_NOT_READY; rpcMsg.code = TSDB_CODE_NOT_READY;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -98,7 +129,11 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { ...@@ -98,7 +129,11 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
} else { } else {
mgmtProcessMsgFromShell(pMsg); dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return;
} }
} }
......
...@@ -50,7 +50,7 @@ void * dnodeGetMnodeInfos(); ...@@ -50,7 +50,7 @@ void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,7 +22,6 @@ extern "C" { ...@@ -22,7 +22,6 @@ extern "C" {
int32_t mgmtInitServer(); int32_t mgmtInitServer();
void mgmtCleanupServer(); void mgmtCleanupServer();
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -151,7 +151,7 @@ int32_t mgmtInitDnodes() { ...@@ -151,7 +151,7 @@ int32_t mgmtInitDnodes() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DNODE, mgmtProcessDropDnodeMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DNODE, mgmtProcessDropDnodeMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta);
......
...@@ -52,7 +52,7 @@ void mgmtCleanupServer() { ...@@ -52,7 +52,7 @@ void mgmtCleanupServer() {
} }
} }
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
mgmtProcessDnodeMsgFp[msgType] = fp; mgmtProcessDnodeMsgFp[msgType] = fp;
} }
......
...@@ -43,7 +43,6 @@ typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -43,7 +43,6 @@ typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, vo
//static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); //static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg);
//static void mgmtProcessMsgFromShell(SRpcMsg *pMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg);
...@@ -52,7 +51,6 @@ static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); ...@@ -52,7 +51,6 @@ static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg); static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg);
void *tsMgmtTmr; void *tsMgmtTmr;
//static void *tsMgmtShellRpc = NULL;
static void *tsMgmtTranQhandle = NULL; static void *tsMgmtTranQhandle = NULL;
static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0};
static void *tsQhandleCache = NULL; static void *tsQhandleCache = NULL;
...@@ -121,7 +119,6 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { ...@@ -121,7 +119,6 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
} }
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
assert(rpcMsg);
if (rpcMsg->pCont == NULL) { if (rpcMsg->pCont == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
......
...@@ -542,7 +542,7 @@ int32_t mgmtInitTables() { ...@@ -542,7 +542,7 @@ int32_t mgmtInitTables() {
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables);
......
...@@ -221,7 +221,7 @@ int32_t mgmtInitVgroups() { ...@@ -221,7 +221,7 @@ int32_t mgmtInitVgroups() {
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg);
mTrace("table:vgroups is created"); mTrace("table:vgroups is created");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册