From 3b54b74554b278fa168a30543fc937f76c1007f5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 26 May 2020 20:08:51 +0800 Subject: [PATCH] [TD-335] rename mgmt to mnode --- src/dnode/CMakeLists.txt | 2 +- src/dnode/inc/dnodeMRead.h | 31 +++ src/dnode/inc/dnodeMWrite.h | 31 +++ src/dnode/inc/dnodeMgmt.h | 2 + src/dnode/inc/dnodeMpeer.h | 31 +++ src/dnode/src/dnodeMRead.c | 160 ++++++++++++++++ src/dnode/src/dnodeMWrite.c | 155 +++++++++++++++ src/dnode/src/dnodeMain.c | 21 +- src/dnode/src/dnodeMgmt.c | 24 ++- src/dnode/src/dnodeMpeer.c | 155 +++++++++++++++ src/dnode/src/dnodePeer.c | 16 +- src/dnode/src/dnodeShell.c | 56 +++--- src/dnode/src/dnodeVRead.c | 2 +- src/dnode/src/dnodeVWrite.c | 2 +- src/inc/dnode.h | 2 +- src/inc/mnode.h | 23 ++- src/mnode/inc/{mgmtAcct.h => mnodeAcct.h} | 0 .../inc/{mgmtDClient.h => mnodeDClient.h} | 0 src/mnode/inc/{mgmtDb.h => mnodeDb.h} | 0 src/mnode/inc/{mgmtDef.h => mnodeDef.h} | 2 +- src/mnode/inc/{mgmtDnode.h => mnodeDnode.h} | 0 src/mnode/inc/{mgmtInt.h => mnodeInt.h} | 0 src/mnode/inc/{mgmtMnode.h => mnodeMnode.h} | 4 +- .../inc/{mgmtProfile.h => mnodeProfile.h} | 0 src/mnode/inc/{mgmtSdb.h => mnodeSdb.h} | 0 src/mnode/inc/{mgmtServer.h => mnodeServer.h} | 4 +- src/mnode/inc/{mgmtShell.h => mnodeShell.h} | 14 +- src/mnode/inc/{mgmtTable.h => mnodeTable.h} | 0 src/mnode/inc/{mgmtUser.h => mnodeUser.h} | 0 src/mnode/inc/{mgmtVgroup.h => mnodeVgroup.h} | 2 +- src/mnode/src/mgmtServer.c | 106 ---------- src/mnode/src/{mgmtAcct.c => mnodeAcct.c} | 0 .../src/{mgmtBalance.c => mnodeBalance.c} | 0 src/mnode/src/{mgmtDb.c => mnodeDb.c} | 22 +-- src/mnode/src/{mgmtDnode.c => mnodeDnode.c} | 28 +-- src/mnode/src/{mgmtGrant.c => mnodeGrant.c} | 0 src/mnode/src/{mgmtMain.c => mnodeMain.c} | 63 ++++-- src/mnode/src/mnodeMgmt.c | 71 +++++++ src/mnode/src/{mgmtMnode.c => mnodeMnode.c} | 4 +- .../src/{mgmtProfile.c => mnodeProfile.c} | 18 +- src/mnode/src/mnodeRead.c | 83 ++++++++ src/mnode/src/{mgmtSdb.c => mnodeSdb.c} | 0 src/mnode/src/{mgmtShell.c => mnodeShow.c} | 181 +++++------------- src/mnode/src/{mgmtTable.c => mnodeTable.c} | 72 +++---- src/mnode/src/{mgmtUser.c => mnodeUser.c} | 16 +- src/mnode/src/{mgmtVgroup.c => mnodeVgroup.c} | 14 +- src/mnode/src/mnodeWrite.c | 96 ++++++++++ 47 files changed, 1116 insertions(+), 397 deletions(-) create mode 100644 src/dnode/inc/dnodeMRead.h create mode 100644 src/dnode/inc/dnodeMWrite.h create mode 100644 src/dnode/inc/dnodeMpeer.h create mode 100644 src/dnode/src/dnodeMRead.c create mode 100644 src/dnode/src/dnodeMWrite.c create mode 100644 src/dnode/src/dnodeMpeer.c rename src/mnode/inc/{mgmtAcct.h => mnodeAcct.h} (100%) rename src/mnode/inc/{mgmtDClient.h => mnodeDClient.h} (100%) rename src/mnode/inc/{mgmtDb.h => mnodeDb.h} (100%) rename src/mnode/inc/{mgmtDef.h => mnodeDef.h} (99%) rename src/mnode/inc/{mgmtDnode.h => mnodeDnode.h} (100%) rename src/mnode/inc/{mgmtInt.h => mnodeInt.h} (100%) rename src/mnode/inc/{mgmtMnode.h => mnodeMnode.h} (93%) rename src/mnode/inc/{mgmtProfile.h => mnodeProfile.h} (100%) rename src/mnode/inc/{mgmtSdb.h => mnodeSdb.h} (100%) rename src/mnode/inc/{mgmtServer.h => mnodeServer.h} (93%) rename src/mnode/inc/{mgmtShell.h => mnodeShell.h} (75%) rename src/mnode/inc/{mgmtTable.h => mnodeTable.h} (100%) rename src/mnode/inc/{mgmtUser.h => mnodeUser.h} (100%) rename src/mnode/inc/{mgmtVgroup.h => mnodeVgroup.h} (97%) delete mode 100644 src/mnode/src/mgmtServer.c rename src/mnode/src/{mgmtAcct.c => mnodeAcct.c} (100%) rename src/mnode/src/{mgmtBalance.c => mnodeBalance.c} (100%) rename src/mnode/src/{mgmtDb.c => mnodeDb.c} (98%) rename src/mnode/src/{mgmtDnode.c => mnodeDnode.c} (96%) rename src/mnode/src/{mgmtGrant.c => mnodeGrant.c} (100%) rename src/mnode/src/{mgmtMain.c => mnodeMain.c} (80%) create mode 100644 src/mnode/src/mnodeMgmt.c rename src/mnode/src/{mgmtMnode.c => mnodeMnode.c} (98%) rename src/mnode/src/{mgmtProfile.c => mnodeProfile.c} (97%) create mode 100644 src/mnode/src/mnodeRead.c rename src/mnode/src/{mgmtSdb.c => mnodeSdb.c} (100%) rename src/mnode/src/{mgmtShell.c => mnodeShow.c} (72%) rename src/mnode/src/{mgmtTable.c => mnodeTable.c} (97%) rename src/mnode/src/{mgmtUser.c => mnodeUser.c} (96%) rename src/mnode/src/{mgmtVgroup.c => mnodeVgroup.c} (98%) create mode 100644 src/mnode/src/mnodeWrite.c diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index de6e15e6b9..2faea588a9 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_EXECUTABLE(taosd ${SRC}) - TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4) + TARGET_LINK_LIBRARIES(taosd taos_static monitor http mqtt tsdb twal vnode cJson lz4) IF (TD_ACCOUNT) TARGET_LINK_LIBRARIES(taosd account) diff --git a/src/dnode/inc/dnodeMRead.h b/src/dnode/inc/dnodeMRead.h new file mode 100644 index 0000000000..0b340a865f --- /dev/null +++ b/src/dnode/inc/dnodeMRead.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MREAD_H +#define TDENGINE_DNODE_MREAD_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitMnodeRead(); +void dnodeCleanupMnodeRead(); +void dnodeDispatchToMnodeReadQueue(SRpcMsg *rpcMsg); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeMWrite.h b/src/dnode/inc/dnodeMWrite.h new file mode 100644 index 0000000000..7a3ec93446 --- /dev/null +++ b/src/dnode/inc/dnodeMWrite.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MWRITE_H +#define TDENGINE_DNODE_MWRITE_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitMnodeWrite(); +void dnodeCleanupMnodeWrite(); +void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 6f2af423bc..949a7c6eee 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -32,6 +32,8 @@ void* dnodeGetVnodeWal(void *pVnode); void* dnodeGetVnodeTsdb(void *pVnode); void dnodeReleaseVnode(void *pVnode); +void dnodeSendRediretMsg(SRpcMsg *pMsg); + #ifdef __cplusplus } #endif diff --git a/src/dnode/inc/dnodeMpeer.h b/src/dnode/inc/dnodeMpeer.h new file mode 100644 index 0000000000..93b31c1749 --- /dev/null +++ b/src/dnode/inc/dnodeMpeer.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MMGMT_H +#define TDENGINE_DNODE_MMGMT_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitMnodeMgmt(); +void dnodeCleanupMnodeMgmt(); +void dnodeDispatchToMnodeMgmtQueue(SRpcMsg *pMsg); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c new file mode 100644 index 0000000000..cd785a804e --- /dev/null +++ b/src/dnode/src/dnodeMRead.c @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tutil.h" +#include "tqueue.h" +#include "trpc.h" +#include "twal.h" +#include "tglobal.h" +#include "mnode.h" +#include "dnode.h" +#include "dnodeInt.h" +#include "dnodeVMgmt.h" +#include "dnodeMRead.h" + +typedef struct { + pthread_t thread; + int32_t workerId; +} SMReadWorker; + +typedef struct { + int32_t num; + SMReadWorker *readWorker; +} SMReadWorkerPool; + +static SMReadWorkerPool tsMReadPool; +static taos_qset tsMReadQset; +static taos_queue tsMReadQueue; + +static void *dnodeProcessMnodeReadQueue(void *param); + +int32_t dnodeInitMnodeRead() { + tsMReadQset = taosOpenQset(); + + tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2; + tsMReadPool.num = MAX(2, tsMReadPool.num); + tsMReadPool.num = MIN(4, tsMReadPool.num); + tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num); + + if (tsMReadPool.readWorker == NULL) return -1; + for (int32_t i = 0; i < tsMReadPool.num; ++i) { + SMReadWorker *pWorker = tsMReadPool.readWorker + i; + pWorker->workerId = i; + } + + dPrint("dnode mread is opened"); + return 0; +} + +void dnodeCleanupMnodeRead() { + for (int32_t i = 0; i < tsMReadPool.num; ++i) { + SMReadWorker *pWorker = tsMReadPool.readWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsMReadQset); + } + } + + for (int32_t i = 0; i < tsMReadPool.num; ++i) { + SMReadWorker *pWorker = tsMReadPool.readWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + + taosCloseQset(tsMReadQset); + free(tsMReadPool.readWorker); + + dPrint("dnode mread is closed"); +} + +int32_t dnodeAllocateMnodeRqueue() { + tsMReadQueue = taosOpenQueue(); + if (tsMReadQueue == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; + + taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL); + + for (int32_t i = 0; i < tsMReadPool.num; ++i) { + SMReadWorker *pWorker = tsMReadPool.readWorker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeReadQueue, pWorker) != 0) { + dError("failed to create thread to process mread queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + dTrace("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num); + } + + dTrace("dnode mread queue:%p is allocated", tsMReadQueue); + return TSDB_CODE_SUCCESS; +} + +void dnodeFreeMnodeRqueue() { + taosCloseQueue(tsMReadQueue); + tsMReadQueue = NULL; +} + +void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) { + if (!mnodeIsRunning() || tsMReadQueue == NULL) { + dnodeSendRediretMsg(pMsg); + return; + } + + SMnodeMsg *pRead = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); + pRead->rpcMsg = *pMsg; + taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); +} + +static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) { + if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; + + SRpcMsg rpcRsp = { + .handle = pRead->rpcMsg.handle, + .pCont = pRead->rspRet.rsp, + .contLen = pRead->rspRet.len, + .code = pRead->rspRet.code, + }; + + rpcSendResponse(&rpcRsp); + rpcFreeCont(pRead->rpcMsg.pCont); +} + +static void *dnodeProcessMnodeReadQueue(void *param) { + SMnodeMsg *pReadMsg; + int32_t type; + void * unUsed; + + while (1) { + if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) { + dTrace("dnodeProcessMnodeReadQueue: got no message from qset, exiting..."); + break; + } + + dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); + int32_t code = mnodeProcessRead(pReadMsg); + dnodeSendRpcMnodeReadRsp(pReadMsg, code); + taosFreeQitem(pReadMsg); + } + + return NULL; +} diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c new file mode 100644 index 0000000000..b768b31b01 --- /dev/null +++ b/src/dnode/src/dnodeMWrite.c @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tutil.h" +#include "tqueue.h" +#include "trpc.h" +#include "twal.h" +#include "tglobal.h" +#include "mnode.h" +#include "dnode.h" +#include "dnodeInt.h" +#include "dnodeVMgmt.h" +#include "dnodeMWrite.h" + +typedef struct { + pthread_t thread; + int32_t workerId; +} SMWriteWorker; + +typedef struct { + int32_t num; + SMWriteWorker *writeWorker; +} SMWriteWorkerPool; + +static SMWriteWorkerPool tsMWritePool; +static taos_qset tsMWriteQset; +static taos_queue tsMWriteQueue; + +static void *dnodeProcessMnodeWriteQueue(void *param); + +int32_t dnodeInitMnodeWrite() { + tsMWriteQset = taosOpenQset(); + + tsMWritePool.num = 1; + tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num); + + if (tsMWritePool.writeWorker == NULL) return -1; + for (int32_t i = 0; i < tsMWritePool.num; ++i) { + SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + pWorker->workerId = i; + } + + dPrint("dnode mwrite is opened"); + return 0; +} + +void dnodeCleanupMnodeWrite() { + for (int32_t i = 0; i < tsMWritePool.num; ++i) { + SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsMWriteQset); + } + } + + for (int32_t i = 0; i < tsMWritePool.num; ++i) { + SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + + dPrint("dnode mwrite is closed"); +} + +int32_t dnodeAllocateMnodeRqueue() { + tsMWriteQueue = taosOpenQueue(); + if (tsMWriteQueue == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; + + taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL); + + for (int32_t i = 0; i < tsMWritePool.num; ++i) { + SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeWriteQueue, pWorker) != 0) { + dError("failed to create thread to process mwrite queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + dTrace("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num); + } + + dTrace("dnode mwrite queue:%p is allocated", tsMWriteQueue); + return TSDB_CODE_SUCCESS; +} + +void dnodeFreeMnodeRqueue() { + taosCloseQueue(tsMWriteQueue); + tsMWriteQueue = NULL; +} + +void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { + if (!mnodeIsRunning() || tsMWriteQueue == NULL) { + dnodeSendRediretMsg(pMsg); + return; + } + + SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); + pWrite->rpcMsg = *pMsg; + taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); +} + +static void dnodeSendRpcMnodeWriteRsp(SMnodeMsg *pWrite, int32_t code) { + if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; + + SRpcMsg rpcRsp = { + .handle = pWrite->rpcMsg.handle, + .pCont = pWrite->rspRet.rsp, + .contLen = pWrite->rspRet.len, + .code = pWrite->rspRet.code, + }; + + rpcSendResponse(&rpcRsp); + rpcFreeCont(pWrite->rpcMsg.pCont); +} + +static void *dnodeProcessMnodeWriteQueue(void *param) { + SMnodeMsg *pWriteMsg; + int32_t type; + void * unUsed; + + while (1) { + if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWriteMsg, &unUsed) == 0) { + dTrace("dnodeProcessMnodeWriteQueue: got no message from qset, exiting..."); + break; + } + + dTrace("%p, msg:%s will be processed", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]); + int32_t code = mnodeProcessWrite(pWriteMsg); + dnodeSendRpcMnodeWriteRsp(pWriteMsg, code); + taosFreeQitem(pWriteMsg); + } + + return NULL; +} diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 68fe986989..e9e9480aef 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -21,12 +21,15 @@ #include "tglobal.h" #include "dnode.h" #include "dnodeInt.h" -#include "dnodeMgmt.h" +#include "dnodeVMgmt.h" #include "dnodePeer.h" #include "dnodeModule.h" #include "dnodeVRead.h" -#include "dnodeShell.h" #include "dnodeVWrite.h" +#include "dnodeMRead.h" +#include "dnodeMWrite.h" +#include "dnodeMMgmt.h" +#include "dnodeShell.h" static int32_t dnodeInitStorage(); static void dnodeCleanupStorage(); @@ -65,8 +68,11 @@ int32_t dnodeInitSystem() { dPrint("start to initialize TDengine on %s", tsLocalEp); if (dnodeInitStorage() != 0) return -1; - if (dnodeInitRead() != 0) return -1; - if (dnodeInitWrite() != 0) return -1; + if (dnodeInitVnodeRead() != 0) return -1; + if (dnodeInitVnodeWrite() != 0) return -1; + if (dnodeInitMnodeRead() != 0) return -1; + if (dnodeInitMnodeWrite() != 0) return -1; + if (dnodeInitMnodeMgmt() != 0) return -1; if (dnodeInitClient() != 0) return -1; if (dnodeInitServer() != 0) return -1; if (dnodeInitMgmt() != 0) return -1; @@ -89,8 +95,11 @@ void dnodeCleanUpSystem() { dnodeCleanupMgmt(); dnodeCleanupServer(); dnodeCleanupClient(); - dnodeCleanupWrite(); - dnodeCleanupRead(); + dnodeCleanupMnodeMgmt(); + dnodeCleanupMnodeWrite(); + dnodeCleanupMnodeRead(); + dnodeCleanupVnodeWrite(); + dnodeCleanupVnodeRead(); dnodeCleanupStorage(); taos_cleanup(); taosCloseLog(); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 4b28992aa4..442b517846 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -32,7 +32,7 @@ #include "vnode.h" #include "mnode.h" #include "dnodeInt.h" -#include "dnodeMgmt.h" +#include "dnodeVMgmt.h" #include "dnodeVRead.h" #include "dnodeVWrite.h" #include "dnodeModule.h" @@ -274,13 +274,16 @@ void dnodeUpdateIpSet(SRpcIpSet *pIpSet) { tsMnodeIpSet = *pIpSet; } -void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) { +void dnodeGetMnodeDnodeIpSet(void *ipSetRaw, bool encode) { SRpcIpSet *ipSet = ipSetRaw; ipSet->numOfIps = tsMnodeInfos.nodeNum; ipSet->inUse = tsMnodeInfos.inUse; for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) { taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]); ipSet->port[i] += TSDB_PORT_DNODEDNODE; + if (encode) { + ipSet->port[i] = htons(ipSet->port[i]); + } } } @@ -590,3 +593,20 @@ int32_t dnodeGetDnodeId() { return tsDnodeCfg.dnodeId; } +void dnodeSendRediretMsg(SRpcMsg *rpcMsg) { + SRpcConnInfo connInfo; + rpcGetConnInfo(rpcMsg->handle, &connInfo); + + SRpcIpSet ipSet = {0}; + dnodeGetMnodeDnodeIpSet(&ipSet); + + dTrace("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[rpcMsg->msgType], + taosIpStr(connInfo.clientIp), connInfo.user, ipSet.numOfIps, ipSet.inUse); + + for (int i = 0; i < ipSet.numOfIps; ++i) { + dTrace("mnode index:%d %s:%d", i, ipSet.fqdn[i], ipSet.port[i]); + ipSet.port[i] = htons(ipSet.port[i]); + } + + rpcSendRedirectRsp(rpcMsg->handle, &ipSet); +} \ No newline at end of file diff --git a/src/dnode/src/dnodeMpeer.c b/src/dnode/src/dnodeMpeer.c new file mode 100644 index 0000000000..d1bfa3a048 --- /dev/null +++ b/src/dnode/src/dnodeMpeer.c @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tutil.h" +#include "tqueue.h" +#include "trpc.h" +#include "twal.h" +#include "tglobal.h" +#include "mnode.h" +#include "dnode.h" +#include "dnodeInt.h" +#include "dnodeVMgmt.h" +#include "dnodeMWrite.h" + +typedef struct { + pthread_t thread; + int32_t workerId; +} SMMgmtWorker; + +typedef struct { + int32_t num; + SMMgmtWorker *mgmtWorker; +} SMMgmtWorkerPool; + +static SMMgmtWorkerPool tsMMgmtPool; +static taos_qset tsMMgmtQset; +static taos_queue tsMMgmtQueue; + +static void *dnodeProcessMnodeMgmtQueue(void *param); + +int32_t dnodeInitMnodeMgmt() { + tsMMgmtQset = taosOpenQset(); + + tsMMgmtPool.num = 1; + tsMMgmtPool.mgmtWorker = (SMMgmtWorker *)calloc(sizeof(SMMgmtWorker), tsMMgmtPool.num); + + if (tsMMgmtPool.mgmtWorker == NULL) return -1; + for (int32_t i = 0; i < tsMMgmtPool.num; ++i) { + SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i; + pWorker->workerId = i; + } + + dPrint("dnode mmgmt is opened"); + return 0; +} + +void dnodeCleanupMnodeMgmt() { + for (int32_t i = 0; i < tsMMgmtPool.num; ++i) { + SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsMMgmtQset); + } + } + + for (int32_t i = 0; i < tsMMgmtPool.num; ++i) { + SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + + dPrint("dnode mmgmt is closed"); +} + +int32_t dnodeAllocateMnodeMqueue() { + tsMMgmtQueue = taosOpenQueue(); + if (tsMMgmtQueue == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; + + taosAddIntoQset(tsMMgmtQset, tsMMgmtQueue, NULL); + + for (int32_t i = 0; i < tsMMgmtPool.num; ++i) { + SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeMgmtQueue, pWorker) != 0) { + dError("failed to create thread to process mmgmt queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + dTrace("dnode mmgmt worker:%d is launched, total:%d", pWorker->workerId, tsMMgmtPool.num); + } + + dTrace("dnode mmgmt queue:%p is allocated", tsMMgmtQueue); + return TSDB_CODE_SUCCESS; +} + +void dnodeFreeMnodeRqueue() { + taosCloseQueue(tsMMgmtQueue); + tsMMgmtQueue = NULL; +} + +void dnodeDispatchToMnodeMgmtQueue(SRpcMsg *pMsg) { + if (!mnodeIsRunning() || tsMMgmtQueue == NULL) { + dnodeSendRediretMsg(pMsg); + return; + } + + SMnodeMsg *pMgmt = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); + pMgmt->rpcMsg = *pMsg; + taosWriteQitem(tsMMgmtQueue, TAOS_QTYPE_RPC, pMgmt); +} + +static void dnodeSendRpcMnodeMgmtRsp(SMnodeMsg *pMgmt, int32_t code) { + if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; + + SRpcMsg rpcRsp = { + .handle = pMgmt->rpcMsg.handle, + .pCont = pMgmt->rspRet.rsp, + .contLen = pMgmt->rspRet.len, + .code = pMgmt->rspRet.code, + }; + + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMgmt->rpcMsg.pCont); +} + +static void *dnodeProcessMnodeMgmtQueue(void *param) { + SMnodeMsg *pMgmtMsg; + int32_t type; + void * unUsed; + + while (1) { + if (taosReadQitemFromQset(tsMMgmtQset, &type, (void **)&pMgmtMsg, &unUsed) == 0) { + dTrace("dnodeProcessMnodeMgmtQueue: got no message from qset, exiting..."); + break; + } + + dTrace("%p, msg:%s will be processed", pMgmtMsg->rpcMsg.ahandle, taosMsg[pMgmtMsg->rpcMsg.msgType]); + int32_t code = mnodeProcessMgmt(pMgmtMsg); + dnodeSendRpcMnodeMgmtRsp(pMgmtMsg, code); + taosFreeQitem(pMgmtMsg); + } + + return NULL; +} diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index c91da4953d..9695de1209 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -25,8 +25,10 @@ #include "trpc.h" #include "dnode.h" #include "dnodeInt.h" -#include "dnodeMgmt.h" +#include "dnodeVMgmt.h" #include "dnodeVWrite.h" +#include "dnodeMRead.h" +#include "dnodeMWrite.h" #include "mnode.h" extern void dnodeUpdateIpSet(SRpcIpSet *pIpSet); @@ -48,11 +50,11 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mgmtProcessReqMsgFromDnode; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mgmtProcessReqMsgFromDnode; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mgmtProcessReqMsgFromDnode; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mgmtProcessReqMsgFromDnode; - dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mgmtProcessReqMsgFromDnode; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodeReadQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodeReadQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMnodeReadQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMnodeWriteQueue; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -167,6 +169,6 @@ void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcIpSet ipSet = {0}; - dnodeGetMnodeDnodeIpSet(&ipSet); + dnodeGetMnodeDnodeIpSet(&ipSet, false); rpcSendRecv(tsDnodeClientRpc, &ipSet, rpcMsg, rpcRsp); } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index fbed164839..031d860b1e 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -26,6 +26,8 @@ #include "dnodeInt.h" #include "dnodeVRead.h" #include "dnodeVWrite.h" +#include "dnodeMRead.h" +#include "dnodeMWrite.h" #include "dnodeShell.h" static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); @@ -43,35 +45,35 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; // the following message shall be treated as mnode write - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMnodeWriteQueue; // the following message shall be treated as mnode query - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mgmtProcessMsgFromShell; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeReadQueue; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 81d14702b1..34bbdb5788 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -23,7 +23,7 @@ #include "twal.h" #include "tglobal.h" #include "dnodeInt.h" -#include "dnodeMgmt.h" +#include "dnodeVMgmt.h" #include "dnodeVRead.h" #include "vnode.h" diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index bf4e49e84d..3783b857d1 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -27,7 +27,7 @@ #include "tdataformat.h" #include "dnodeInt.h" #include "dnodeVWrite.h" -#include "dnodeMgmt.h" +#include "dnodeVMgmt.h" typedef struct { taos_qall qall; diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 9884cf2870..bcab82cc5d 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -45,7 +45,7 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); bool dnodeIsFirstDeploy(); char *dnodeGetMnodeMasterEp(); -void dnodeGetMnodeDnodeIpSet(void *ipSet); +void dnodeGetMnodeDnodeIpSet(void *ipSet, bool encode); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 48b1ac97bd..f025cb5e8a 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -20,17 +20,38 @@ extern "C" { #endif +typedef struct { + int len; + int code; + void *rsp; +} SMnodeRsp; + +typedef struct { + SRpcMsg rpcMsg; + SMnodeRsp rpcRsp; +} SMnodeMsg; + +SMnodeMsg *mnodeCreateMsg(SRpcMsg *rpcMsg); +bool mnodeInitMsg(SMnodeMsg *pMsg); +void mnodeRleaseMsg(SMnodeMsg *pMsg); + int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); void mgmtStopSystem(); void sdbUpdateSync(); +void* mnodeGetRqueue(void *); +void* mnodeGetWqueue(int32_t vgId); +bool mnodeIsRunning(); +int32_t mnodeProcessRead(SMnodeMsg *pMsg); +int32_t mnodeProcessWrite(SMnodeMsg *pMsg); +int32_t mnodeProcessMgmt(SMnodeMsg *pMsg); + int32_t mgmtRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg); void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtAcct.h b/src/mnode/inc/mnodeAcct.h similarity index 100% rename from src/mnode/inc/mgmtAcct.h rename to src/mnode/inc/mnodeAcct.h diff --git a/src/mnode/inc/mgmtDClient.h b/src/mnode/inc/mnodeDClient.h similarity index 100% rename from src/mnode/inc/mgmtDClient.h rename to src/mnode/inc/mnodeDClient.h diff --git a/src/mnode/inc/mgmtDb.h b/src/mnode/inc/mnodeDb.h similarity index 100% rename from src/mnode/inc/mgmtDb.h rename to src/mnode/inc/mnodeDb.h diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mnodeDef.h similarity index 99% rename from src/mnode/inc/mgmtDef.h rename to src/mnode/inc/mnodeDef.h index 9d3e46205d..07920403b1 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -251,7 +251,7 @@ typedef struct { SDbObj *pDb; SVgObj *pVgroup; STableObj *pTable; -} SQueuedMsg; +} SMnodeMsg; #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mnodeDnode.h similarity index 100% rename from src/mnode/inc/mgmtDnode.h rename to src/mnode/inc/mnodeDnode.h diff --git a/src/mnode/inc/mgmtInt.h b/src/mnode/inc/mnodeInt.h similarity index 100% rename from src/mnode/inc/mgmtInt.h rename to src/mnode/inc/mnodeInt.h diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mnodeMnode.h similarity index 93% rename from src/mnode/inc/mgmtMnode.h rename to src/mnode/inc/mnodeMnode.h index 0973aa6ea6..123df73fb2 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -42,7 +42,9 @@ void mgmtIncMnodeRef(struct SMnodeObj *pMnode); void mgmtDecMnodeRef(struct SMnodeObj *pMnode); char * mgmtGetMnodeRoleStr(); -void mgmtGetMnodeIpSet(SRpcIpSet *ipSet); +void mgmtGetMnodeIpSetForPeer(SRpcIpSet *ipSet); +void mgmtGetMnodeIpSetForShell(SRpcIpSet *ipSet); + void mgmtGetMnodeInfos(void *mnodes); void mgmtUpdateMnodeIpSet(); diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mnodeProfile.h similarity index 100% rename from src/mnode/inc/mgmtProfile.h rename to src/mnode/inc/mnodeProfile.h diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mnodeSdb.h similarity index 100% rename from src/mnode/inc/mgmtSdb.h rename to src/mnode/inc/mnodeSdb.h diff --git a/src/mnode/inc/mgmtServer.h b/src/mnode/inc/mnodeServer.h similarity index 93% rename from src/mnode/inc/mgmtServer.h rename to src/mnode/inc/mnodeServer.h index 08e4463ad8..69df500bbc 100644 --- a/src/mnode/inc/mgmtServer.h +++ b/src/mnode/inc/mnodeServer.h @@ -20,8 +20,8 @@ extern "C" { #endif -int32_t mgmtInitServer(); -void mgmtCleanupServer(); +int32_t mnodeInitMgmt(); +void mgmtCleanupMgmt(); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mnodeShell.h similarity index 75% rename from src/mnode/inc/mgmtShell.h rename to src/mnode/inc/mnodeShell.h index c3ae3e96e8..976dc360f3 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mnodeShell.h @@ -23,15 +23,15 @@ extern "C" { int32_t mgmtInitShell(); void mgmtCleanUpShell(); -void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SQueuedMsg *queuedMsg)); +void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *queuedMsg)); typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp); -void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); +void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp); +void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); -void mgmtAddToShellQueue(SQueuedMsg *queuedMsg); -void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg); +void mgmtAddToShellQueue(SMnodeMsg *queuedMsg); +void mgmtDealyedAddToShellQueue(SMnodeMsg *queuedMsg); void mgmtSendSimpleResp(void *thandle, int32_t code); bool mgmtCheckQhandle(uint64_t qhandle); @@ -39,8 +39,8 @@ void *mgmtSaveQhandle(void *qhandle, int32_t size); void mgmtFreeQhandle(void *qhandle, bool forceRemove); void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg); -void *mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg); -void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); +void *mgmtCloneQueuedMsg(SMnodeMsg *pSrcMsg); +void mgmtFreeQueuedMsg(SMnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mnodeTable.h similarity index 100% rename from src/mnode/inc/mgmtTable.h rename to src/mnode/inc/mnodeTable.h diff --git a/src/mnode/inc/mgmtUser.h b/src/mnode/inc/mnodeUser.h similarity index 100% rename from src/mnode/inc/mgmtUser.h rename to src/mnode/inc/mnodeUser.h diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mnodeVgroup.h similarity index 97% rename from src/mnode/inc/mgmtVgroup.h rename to src/mnode/inc/mnodeVgroup.h index ab0345cd20..948aec06e5 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mnodeVgroup.h @@ -35,7 +35,7 @@ void * mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *dnodeId, SVnodeLoad *pVload); -void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb); +void mgmtCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); diff --git a/src/mnode/src/mgmtServer.c b/src/mnode/src/mgmtServer.c deleted file mode 100644 index 7810189e34..0000000000 --- a/src/mnode/src/mgmtServer.c +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "taoserror.h" -#include "trpc.h" -#include "tsched.h" -#include "tsystem.h" -#include "tutil.h" -#include "tgrant.h" -#include "tbalance.h" -#include "tglobal.h" -#include "dnode.h" -#include "mgmtDef.h" -#include "mgmtInt.h" -#include "mgmtDb.h" -#include "mgmtMnode.h" -#include "mgmtProfile.h" -#include "mgmtShell.h" -#include "mgmtSdb.h" -#include "mgmtTable.h" -#include "mgmtVgroup.h" - -static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void *tsMgmtServerQhandle = NULL; - -int32_t mgmtInitServer() { - - tsMgmtServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS"); - - mPrint("server connection to dnode is opened"); - return 0; -} - -void mgmtCleanupServer() { - if (tsMgmtServerQhandle) { - taosCleanUpScheduler(tsMgmtServerQhandle); - tsMgmtServerQhandle = NULL; - } -} - -void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { - mgmtProcessDnodeMsgFp[msgType] = fp; -} - -static void mgmtProcessRequestFromDnode(SSchedMsg *sched) { - SRpcMsg *pMsg = sched->msg; - (*mgmtProcessDnodeMsgFp[pMsg->msgType])(pMsg); - rpcFreeCont(pMsg->pCont); - free(pMsg); -} - -static void mgmtAddToServerQueue(SRpcMsg *pMsg) { - SSchedMsg schedMsg; - schedMsg.msg = pMsg; - schedMsg.fp = mgmtProcessRequestFromDnode; - taosScheduleTask(tsMgmtServerQhandle, &schedMsg); -} - -void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg) { - if (mgmtProcessDnodeMsgFp[rpcMsg->msgType] == NULL) { - mError("%s is not processed in mnode", taosMsg[rpcMsg->msgType]); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED); - rpcFreeCont(rpcMsg->pCont); - } - - if (rpcMsg->pCont == NULL) { - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); - return; - } - - if (!sdbIsMaster()) { - SRpcConnInfo connInfo; - rpcGetConnInfo(rpcMsg->handle, &connInfo); - - SRpcIpSet ipSet = {0}; - dnodeGetMnodeDnodeIpSet(&ipSet); - for (int i = 0; i < ipSet.numOfIps; ++i) - ipSet.port[i] = htons(ipSet.port[i]); - - mTrace("conn from dnode ip:%s user:%s redirect msg, inUse:%d", taosIpStr(connInfo.clientIp), connInfo.user, ipSet.inUse); - for (int32_t i = 0; i < ipSet.numOfIps; ++i) { - mTrace("mnode index:%d %s:%d", i, ipSet.fqdn[i], htons(ipSet.port[i])); - } - rpcSendRedirectRsp(rpcMsg->handle, &ipSet); - return; - } - - SRpcMsg *pMsg = malloc(sizeof(SRpcMsg)); - memcpy(pMsg, rpcMsg, sizeof(SRpcMsg)); - mgmtAddToServerQueue(pMsg); -} - diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mnodeAcct.c similarity index 100% rename from src/mnode/src/mgmtAcct.c rename to src/mnode/src/mnodeAcct.c diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mnodeBalance.c similarity index 100% rename from src/mnode/src/mgmtBalance.c rename to src/mnode/src/mnodeBalance.c diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mnodeDb.c similarity index 98% rename from src/mnode/src/mgmtDb.c rename to src/mnode/src/mnodeDb.c index df18fe8786..09e53d13c9 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mnodeDb.c @@ -41,13 +41,13 @@ static void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); -static void mgmtDropDb(SQueuedMsg *newMsg); +static void mgmtDropDb(SMnodeMsg *newMsg); static int32_t mgmtSetDbDropping(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); -static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg); -static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg); +static void mgmtProcessCreateDbMsg(SMnodeMsg *pMsg); +static void mgmtProcessAlterDbMsg(SMnodeMsg *pMsg); +static void mgmtProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mgmtDbActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); @@ -150,8 +150,8 @@ int32_t mgmtInitDbs() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mgmtProcessCreateDbMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mgmtProcessAlterDbMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mgmtProcessDropDbMsg); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DB, mgmtGetDbMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mgmtRetrieveDbs); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mgmtGetDbMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mgmtRetrieveDbs); mTrace("table:dbs table is created"); return 0; @@ -748,7 +748,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) { return code; } -static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { +static void mgmtProcessCreateDbMsg(SMnodeMsg *pMsg) { SCMCreateDbMsg *pCreate = pMsg->pCont; pCreate->maxTables = htonl(pCreate->maxTables); @@ -935,7 +935,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { return TSDB_CODE_SUCCESS; } -static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { +static void mgmtProcessAlterDbMsg(SMnodeMsg *pMsg) { SCMAlterDbMsg *pAlter = pMsg->pCont; mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle); @@ -963,7 +963,7 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); } -static void mgmtDropDb(SQueuedMsg *pMsg) { +static void mgmtDropDb(SMnodeMsg *pMsg) { SDbObj *pDb = pMsg->pDb; mPrint("db:%s, drop db from sdb", pDb->name); @@ -980,7 +980,7 @@ static void mgmtDropDb(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, code); } -static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropDbMsg(SMnodeMsg *pMsg) { SCMDropDbMsg *pDrop = pMsg->pCont; mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle); @@ -1022,7 +1022,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { SVgObj *pVgroup = pMsg->pDb->pHead; if (pVgroup != NULL) { mPrint("vgId:%d, will be dropped", pVgroup->vgId); - SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); + SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pVgroup; newMsg->expected = pVgroup->numOfVnodes; mgmtDropVgroup(pVgroup, newMsg); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mnodeDnode.c similarity index 96% rename from src/mnode/src/mgmtDnode.c rename to src/mnode/src/mnodeDnode.c index 93c8276b14..ce0b1d07e9 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -42,9 +42,9 @@ extern void * tsMnodeSdb; extern void * tsVgroupSdb; static int32_t mgmtCreateDnode(char *ep); -static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg); -static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg); -static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg); +static void mgmtProcessCreateDnodeMsg(SMnodeMsg *pMsg); +static void mgmtProcessDropDnodeMsg(SMnodeMsg *pMsg); +static void mgmtProcessCfgDnodeMsg(SMnodeMsg *pMsg); static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ; static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); @@ -153,14 +153,14 @@ int32_t mgmtInitDnodes() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DNODE, mgmtGetDnodeMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, mgmtRetrieveDnodes); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DNODE, mgmtGetDnodeMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, mgmtRetrieveDnodes); mTrace("table:dnodes table is created"); return 0; @@ -236,7 +236,7 @@ void mgmtUpdateDnode(SDnodeObj *pDnode) { sdbUpdateRow(&oper); } -void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { +void mgmtProcessCfgDnodeMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont; @@ -451,7 +451,7 @@ static int32_t mgmtDropDnodeByEp(char *ep) { #endif } -static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { +static void mgmtProcessCreateDnodeMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SCMCreateDnodeMsg *pCreate = pMsg->pCont; @@ -472,7 +472,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { } -static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropDnodeMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SCMDropDnodeMsg *pDrop = pMsg->pCont; diff --git a/src/mnode/src/mgmtGrant.c b/src/mnode/src/mnodeGrant.c similarity index 100% rename from src/mnode/src/mgmtGrant.c rename to src/mnode/src/mnodeGrant.c diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mnodeMain.c similarity index 80% rename from src/mnode/src/mgmtMain.c rename to src/mnode/src/mnodeMain.c index 2a8e139eec..204a8a638e 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mnodeMain.c @@ -35,9 +35,13 @@ #include "mgmtTable.h" #include "mgmtShell.h" -extern void *tsMgmtTmr; +static void *tsMgmtTmr; static bool tsMgmtIsRunning = false; +static void mnodeInitTimer(); +static void mnodeCleanupTimer(); +static bool mnodeNeedStart() ; + int32_t mgmtStartSystem() { if (tsMgmtIsRunning) { mPrint("TDengine mgmt module already started..."); @@ -99,7 +103,7 @@ int32_t mgmtStartSystem() { return -1; } - if (mgmtInitServer() < 0) { + if (mnodeInitMgmt() < 0) { return -1; } @@ -112,16 +116,11 @@ int32_t mgmtStartSystem() { } int32_t mgmtInitSystem() { - if (mgmtInitShell() != 0) { - mError("failed to init shell"); - return -1; - } - - struct stat dirstat; - bool fileExist = (stat(tsMnodeDir, &dirstat) == 0); - bool asMaster = (strcmp(tsFirst, tsLocalEp) == 0); + mnodeInitTimer(); + mnodeInitRead(); + mnodeInitWrite(); - if (asMaster || fileExist) { + if (mnodeNeedStart()) { if (mgmtStartSystem() != 0) { return -1; } @@ -133,8 +132,12 @@ int32_t mgmtInitSystem() { void mgmtCleanUpSystem() { mPrint("starting to clean up mgmt"); tsMgmtIsRunning = false; - mgmtCleanUpShell(); - mgmtCleanupServer(); + + mnodeCleanupTimer(); + mnodeCleanupRead(); + mnodeCleanupWrite(); + + mgmtCleanupMgmt(); grantCleanUp(); balanceCleanUp(); sdbCleanUp(); @@ -153,9 +156,43 @@ void mgmtStopSystem() { mTrace("it is a master mgmt node, it could not be stopped"); return; } + mgmtCleanUpSystem(); mPrint("mgmt file is removed"); remove(tsMnodeDir); } + + + +void* mnodeGetWqueue(int32_t vgId) { + +} + + + +static void mnodeInitTimer() { + if (tsMgmtTmr != NULL) { + tsMgmtTmr = taosTmrInit((tsMaxShellConns)*3, 200, 3600000, "MND"); + } +} + +static void mnodeCleanupTimer() { + if (tsMgmtTmr != NULL) { + taosTmrCleanUp(tsMgmtTmr); + tsMgmtTmr = NULL; + } +} + +static bool mnodeNeedStart() { + struct stat dirstat; + bool fileExist = (stat(tsMnodeDir, &dirstat) == 0); + bool asMaster = (strcmp(tsFirst, tsLocalEp) == 0); + + if (asMaster || fileExist) { + return true; + } + + return false; +} diff --git a/src/mnode/src/mnodeMgmt.c b/src/mnode/src/mnodeMgmt.c new file mode 100644 index 0000000000..8833462406 --- /dev/null +++ b/src/mnode/src/mnodeMgmt.c @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "trpc.h" +#include "tsched.h" +#include "tsystem.h" +#include "tutil.h" +#include "tgrant.h" +#include "tbalance.h" +#include "tglobal.h" +#include "dnode.h" +#include "mgmtDef.h" +#include "mgmtInt.h" +#include "mgmtDb.h" +#include "mgmtMnode.h" +#include "mgmtProfile.h" +#include "mgmtShell.h" +#include "mgmtSdb.h" +#include "mgmtTable.h" +#include "mgmtVgroup.h" + +static void (*tsMnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *); + +void mnodeAddMgmtMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *pMsg)) { + tsMnodeProcessMgmtMsgFp[msgType] = fp; +} + +int32_t mnodeProcessMgmt(SMnodeMsg *pMsg) { + SRpcMsg *rpcMsg = &pMsg->rpcMsg; + if (rpcMsg->pCont == NULL) { + mError("%p, msg:%s content is null", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_INVALID_MSG_LEN; + } + + if (!sdbIsMaster()) { + SMnodeRsp *rpcRsp = &pMsg->rpcRsp; + SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); + mgmtGetMnodeIpSetForPeer(ipSet); + rpcRsp->rsp = ipSet; + rpcRsp->len = sizeof(SRpcIpSet); + + mTrace("%p, msg:%s will be redireced, inUse:%d", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], ipSet->inUse); + for (int32_t i = 0; i < ipSet->numOfIps; ++i) { + mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + } + + return TSDB_CODE_REDIRECT; + } + + if (tsMnodeProcessMgmtMsgFp[rpcMsg->msgType] == NULL) { + mError("%p, msg:%s not processed, no handle exist", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_MSG_NOT_PROCESSED; + } + + return (*tsMnodeProcessMgmtMsgFp[rpcMsg->msgType])(rpcMsg, ); +} diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mnodeMnode.c similarity index 98% rename from src/mnode/src/mgmtMnode.c rename to src/mnode/src/mnodeMnode.c index 6471b7f182..aa028f594b 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -154,8 +154,8 @@ int32_t mgmtInitMnodes() { return -1; } - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); mTrace("table:mnodes table is created"); return TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mnodeProfile.c similarity index 97% rename from src/mnode/src/mgmtProfile.c rename to src/mnode/src/mnodeProfile.c index 6667bff052..3cf340db4a 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -672,7 +672,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn return numOfRows; } -void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { +void mgmtProcessKillQueryMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); @@ -696,7 +696,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { mgmtDecUserRef(pUser); } -void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { +void mgmtProcessKillStreamMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); @@ -720,7 +720,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { mgmtDecUserRef(pUser); } -void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { +void mgmtProcessKillConnectionMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); @@ -745,12 +745,12 @@ void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { } int32_t mgmtInitProfile() { - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mgmtGetQueryMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mgmtRetrieveQueries); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mgmtGetConnsMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mgmtRetrieveConns); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mgmtGetStreamMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mgmtRetrieveStreams); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mgmtGetQueryMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mgmtRetrieveQueries); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mgmtGetConnsMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mgmtRetrieveConns); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mgmtGetStreamMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mgmtRetrieveStreams); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mgmtProcessKillQueryMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mgmtProcessKillStreamMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mgmtProcessKillConnectionMsg); diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c new file mode 100644 index 0000000000..8d06113f8a --- /dev/null +++ b/src/mnode/src/mnodeRead.c @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosdef.h" +#include "tsched.h" +#include "tbalance.h" +#include "tgrant.h" +#include "ttimer.h" +#include "tglobal.h" +#include "mnode.h" +#include "dnode.h" +#include "mgmtDef.h" +#include "mgmtInt.h" +#include "mgmtServer.h" +#include "mgmtAcct.h" +#include "mgmtDnode.h" +#include "mgmtMnode.h" +#include "mgmtDb.h" +#include "mgmtSdb.h" +#include "mgmtVgroup.h" +#include "mgmtUser.h" +#include "mgmtTable.h" +#include "mgmtShell.h" + +static void (*tsMnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *); + +void mnodeAddReadMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *pMsg)) { + tsMnodeProcessReadMsgFp[msgType] = fp; +} + +int32_t mnodeProcessRead(SMnodeMsg *pMsg) { + SRpcMsg *rpcMsg = &pMsg->rpcMsg; + if (rpcMsg->pCont == NULL) { + mError("%p, msg:%s content is null", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_INVALID_MSG_LEN; + } + + if (!sdbIsMaster()) { + SMnodeRsp *rpcRsp = &pMsg->rpcRsp; + SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); + mgmtGetMnodeIpSetForShell(ipSet); + rpcRsp->rsp = ipSet; + rpcRsp->len = sizeof(SRpcIpSet); + + mTrace("%p, msg:%s will be redireced, inUse:%d", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], ipSet->inUse); + for (int32_t i = 0; i < ipSet->numOfIps; ++i) { + mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + } + + return TSDB_CODE_REDIRECT; + } + + if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { + mError("%p, msg:%s not processed, grant time expired", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_GRANT_EXPIRED; + } + + if (tsMnodeProcessReadMsgFp[rpcMsg->msgType] == NULL) { + mError("%p, msg:%s not processed, no handle exist", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_MSG_NOT_PROCESSED; + } + + if (!mnodeInitMsg(pMsg)) { + mError("%p, msg:%s not processed, reason:%s", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], tstrerror(terrno)); + return terrno; + } + + return (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg); +} diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mnodeSdb.c similarity index 100% rename from src/mnode/src/mgmtSdb.c rename to src/mnode/src/mnodeSdb.c diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mnodeShow.c similarity index 72% rename from src/mnode/src/mgmtShell.c rename to src/mnode/src/mnodeShow.c index ccbed350dc..8138fafc06 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mnodeShow.c @@ -41,142 +41,52 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); -static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); -static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); -static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); -static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); -static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); -static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg); +static void mgmtProcessShowMsg(SMnodeMsg *queuedMsg); +static void mgmtProcessRetrieveMsg(SMnodeMsg *queuedMsg); +static void mgmtProcessHeartBeatMsg(SMnodeMsg *queuedMsg); +static void mgmtProcessConnectMsg(SMnodeMsg *queuedMsg); +static void mgmtProcessUseMsg(SMnodeMsg *queuedMsg); static void mgmtFreeShowObj(void *data); -void *tsMgmtTmr; -static void *tsMgmtTranQhandle = NULL; -static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; static void *tsQhandleCache = NULL; -static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; -static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; - -int32_t mgmtInitShell() { - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE, mgmtProcessRetrieveMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg); +static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; +static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; + +void mnodeInitShow() { + mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); + mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE, mgmtProcessRetrieveMsg); + mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); + mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); + mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg); - tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND"); - tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT"); tsQhandleCache = taosCacheInitWithCb(tsMgmtTmr, 10, mgmtFreeShowObj); - - return 0; } -void mgmtCleanUpShell() { - if (tsMgmtTmr != NULL){ - taosTmrCleanUp(tsMgmtTmr); - tsMgmtTmr = NULL; - } - +void mnodeCleanUpShow() { if (tsQhandleCache != NULL) { taosCacheCleanup(tsQhandleCache); tsQhandleCache = NULL; } - - if (tsMgmtTranQhandle != NULL) { - taosCleanUpScheduler(tsMgmtTranQhandle); - tsMgmtTranQhandle = NULL; - } } -void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SQueuedMsg *queuedMsg)) { - tsMgmtProcessShellMsgFp[showType] = fp; +void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp) { + tsMnodeShowMetaFp[showType] = fp; } -void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp) { - tsMgmtShowMetaFp[showType] = fp; +void mnodeAddShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { + tsMnodeShowRetrieveFp[msgType] = fp; } -void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { - tsMgmtShowRetrieveFp[msgType] = fp; -} +int32_t mnodeProcessRead(int msgType, void *pCont, int32_t contLen, SRspRet *ret) { + if (vnodeProcessReadMsgFp[msgType] == NULL) + return TSDB_CODE_MSG_NOT_PROCESSED; -void mgmtProcessTranRequest(SSchedMsg *sched) { - SQueuedMsg *queuedMsg = sched->msg; - (*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg); - mgmtFreeQueuedMsg(queuedMsg); -} - -void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { - SSchedMsg schedMsg; - schedMsg.msg = queuedMsg; - schedMsg.fp = mgmtProcessTranRequest; - taosScheduleTask(tsMgmtTranQhandle, &schedMsg); -} - -static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) { - mgmtAddToShellQueue(param); -} + if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_NOT_ACTIVE_VNODE; -void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { - void *unUsed = NULL; - taosTmrReset(mgmtDoDealyedAddToShellQueue, 300, queuedMsg, tsMgmtTmr, &unUsed); + return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); } -void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { - - mTrace("%p, msg:%s will be processed", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); - - if (rpcMsg->pCont == NULL) { - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); - return; - } - - if (!sdbIsMaster()) { - SRpcConnInfo connInfo; - rpcGetConnInfo(rpcMsg->handle, &connInfo); - - SRpcIpSet ipSet = {0}; - mgmtGetMnodeIpSet(&ipSet); - mTrace("conn from shell ip:%s user:%s redirect msg, inUse:%d", taosIpStr(connInfo.clientIp), connInfo.user, ipSet.inUse); - for (int32_t i = 0; i < ipSet.numOfIps; ++i) { - mTrace("mnode index:%d ip:%s:%d", i, ipSet.fqdn[i], htons(ipSet.port[i])); - } - - rpcSendRedirectRsp(rpcMsg->handle, &ipSet); - return; - } - - if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED); - rpcFreeCont(rpcMsg->pCont); - return; - } - - if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) { - mgmtProcessUnSupportMsg(rpcMsg); - rpcFreeCont(rpcMsg->pCont); - return; - } - - SQueuedMsg *pMsg = mgmtMallocQueuedMsg(rpcMsg); - if (pMsg == NULL) { - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); - rpcFreeCont(rpcMsg->pCont); - return; - } - - if (mgmtCheckMsgReadOnly(pMsg)) { - (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg); - mgmtFreeQueuedMsg(pMsg); - } else { - if (!pMsg->pUser->writeAuth) { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); - mgmtFreeQueuedMsg(pMsg); - } else { - mgmtAddToShellQueue(pMsg); - } - } -} char *mgmtGetShowTypeStr(int32_t showType) { switch (showType) { @@ -200,14 +110,14 @@ char *mgmtGetShowTypeStr(int32_t showType) { } } -static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { +static void mgmtProcessShowMsg(SMnodeMsg *pMsg) { SCMShowMsg *pShowMsg = pMsg->pCont; if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE); return; } - if (!tsMgmtShowMetaFp[pShowMsg->type] || !tsMgmtShowRetrieveFp[pShowMsg->type]) { + if (!tsMnodeShowMetaFp[pShowMsg->type] || !tsMnodeShowRetrieveFp[pShowMsg->type]) { mError("show type:%s is not support", mgmtGetShowTypeStr(pShowMsg->type)); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT); return; @@ -232,7 +142,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { pShowRsp->qhandle = htobe64((uint64_t) pShow); mTrace("show:%p, type:%s, start to get meta", pShow, mgmtGetShowTypeStr(pShowMsg->type)); - int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle); + int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle); if (code == 0) { SRpcMsg rpcRsp = { .handle = pMsg->thandle, @@ -252,7 +162,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { } } -static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { +static void mgmtProcessRetrieveMsg(SMnodeMsg *pMsg) { int32_t rowsToRead = 0; int32_t size = 0; int32_t rowsRead = 0; @@ -291,7 +201,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { // if free flag is set, client wants to clean the resources if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) - rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle); + rowsRead = (*tsMnodeShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle); if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS; rpcFreeCont(pRsp); @@ -318,7 +228,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { } } -static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { +static void mgmtProcessHeartBeatMsg(SMnodeMsg *pMsg) { SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); if (pHBRsp == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); @@ -347,7 +257,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); } -static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { +static void mgmtProcessConnectMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SCMConnectMsg *pConnectMsg = pMsg->pCont; @@ -407,7 +317,7 @@ connect_over: rpcSendResponse(&rpcRsp); } -static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { +static void mgmtProcessUseMsg(SMnodeMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SCMUseDbMsg *pUseDbMsg = pMsg->pCont; @@ -426,7 +336,7 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { /** * check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one. */ -static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) { +static bool mgmtCheckTableMetaMsgReadOnly(SMnodeMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->pCont; if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pInfo->tableId); if (pMsg->pTable != NULL) return true; @@ -441,7 +351,7 @@ static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) { return true; } -static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) { +static bool mgmtCheckMsgReadOnly(SMnodeMsg *pMsg) { if (pMsg->msgType == TSDB_MSG_TYPE_CM_TABLE_META) { return mgmtCheckTableMetaMsgReadOnly(pMsg); } @@ -514,23 +424,30 @@ void mgmtFreeQhandle(void *qhandle, bool forceRemove) { taosCacheRelease(tsQhandleCache, &qhandle, forceRemove); } -void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { +void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg, SRspRet *pRet) { SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { + terrno = TSDB_CODE_INVALID_USER; + return NULL; + } + + SMnodeMsg *pMsg = calloc(1, sizeof(SMnodeMsg)); + if (pMsg == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } - SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg)); pMsg->thandle = rpcMsg->handle; pMsg->msgType = rpcMsg->msgType; pMsg->contLen = rpcMsg->contLen; - pMsg->pCont = rpcMsg->pCont; - pMsg->pUser = pUser; + pMsg->pCont = rpcMsg->pCont; + pMsg->pUser = pUser; + pMsg->pRet = pRet; return pMsg; } -void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { +void mgmtFreeQueuedMsg(SMnodeMsg *pMsg) { if (pMsg != NULL) { rpcFreeCont(pMsg->pCont); if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser); @@ -543,8 +460,8 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { } } -void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { - SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg)); +void* mgmtCloneQueuedMsg(SMnodeMsg *pSrcMsg) { + SMnodeMsg *pDestMsg = calloc(1, sizeof(SMnodeMsg)); pDestMsg->thandle = pSrcMsg->thandle; pDestMsg->msgType = pSrcMsg->msgType; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mnodeTable.c similarity index 97% rename from src/mnode/src/mgmtTable.c rename to src/mnode/src/mnodeTable.c index 53fbd64f87..609be0f542 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mnodeTable.c @@ -58,27 +58,27 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); -static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg); -static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg); +static void mgmtProcessCreateTableMsg(SMnodeMsg *queueMsg); +static void mgmtProcessCreateSuperTableMsg(SMnodeMsg *pMsg); +static void mgmtProcessCreateChildTableMsg(SMnodeMsg *pMsg); static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg); -static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); -static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropTableMsg(SMnodeMsg *queueMsg); +static void mgmtProcessDropSuperTableMsg(SMnodeMsg *pMsg); static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg); -static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropChildTableMsg(SMnodeMsg *pMsg); static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg); -static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg); -static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessSuperTableVgroupMsg(SMnodeMsg *queueMsg); +static void mgmtProcessMultiTableMetaMsg(SMnodeMsg *queueMsg); static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); -static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); -static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg); -static void mgmtGetChildTableMeta(SQueuedMsg *pMsg); -static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg); +static void mgmtProcessTableMetaMsg(SMnodeMsg *queueMsg); +static void mgmtGetSuperTableMeta(SMnodeMsg *pMsg); +static void mgmtGetChildTableMeta(SMnodeMsg *pMsg); +static void mgmtAutoCreateChildTable(SMnodeMsg *pMsg); -static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessAlterTableMsg(SMnodeMsg *queueMsg); static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName); @@ -559,10 +559,10 @@ int32_t mgmtInitTables() { dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); return TSDB_CODE_SUCCESS; } @@ -655,7 +655,7 @@ static void mgmtExtractTableName(char* tableId, char* name) { } } -static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessCreateTableMsg(SMnodeMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pCreate->db); @@ -689,7 +689,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { } } -static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropTableMsg(SMnodeMsg *pMsg) { SCMDropTableMsg *pDrop = pMsg->pCont; if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { @@ -726,7 +726,7 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } } -static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { +static void mgmtProcessTableMetaMsg(SMnodeMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->pCont; pInfo->createFlag = htons(pInfo->createFlag); mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->thandle, pInfo->createFlag); @@ -755,7 +755,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { } } -static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) { @@ -812,7 +812,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { } } -static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropSuperTableMsg(SMnodeMsg *pMsg) { SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; if (pStable->numOfTables != 0) { SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash); @@ -1239,7 +1239,7 @@ static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTa return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); } -static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { +static void mgmtGetSuperTableMeta(SMnodeMsg *pMsg) { SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); pMeta->uid = htobe64(pTable->uid); @@ -1263,7 +1263,7 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { mTrace("stable:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); } -static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { +static void mgmtProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { SCMSTableVgroupMsg *pInfo = pMsg->pCont; int32_t numOfTable = htonl(pInfo->numOfTables); @@ -1487,7 +1487,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj return pTable; } -static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessCreateChildTableMsg(SMnodeMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); if (code != TSDB_CODE_SUCCESS) { @@ -1536,7 +1536,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { } SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); + SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pMsg->pTable; newMsg->maxRetry = 10; SRpcMsg rpcMsg = { @@ -1550,7 +1550,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { dnodeSendMsgToDnode(&ipSet, &rpcMsg); } -static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropChildTableMsg(SMnodeMsg *pMsg) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; if (pMsg->pVgroup == NULL) pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); if (pMsg->pVgroup == NULL) { @@ -1575,7 +1575,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup); mPrint("table:%s, send drop ctable msg", pDrop->tableId); - SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); + SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pMsg->pTable; SRpcMsg rpcMsg = { .handle = newMsg, @@ -1695,7 +1695,7 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pT return numOfCols * sizeof(SSchema); } -static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { +static int32_t mgmtDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { SDbObj *pDb = pMsg->pDb; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; @@ -1740,7 +1740,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { return TSDB_CODE_SUCCESS; } -static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) { +static void mgmtAutoCreateChildTable(SMnodeMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->pCont; STagData* pTag = (STagData*)pInfo->tags; @@ -1760,7 +1760,7 @@ static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) { memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg)); - SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); + SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; newMsg->pCont = pCreateMsg; @@ -1768,7 +1768,7 @@ static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) { mgmtAddToShellQueue(newMsg); } -static void mgmtGetChildTableMeta(SQueuedMsg *pMsg) { +static void mgmtGetChildTableMeta(SMnodeMsg *pMsg) { STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); if (pMeta == NULL) { mError("table:%s, failed to get table meta, no enough memory", pMsg->pTable->tableId); @@ -1926,7 +1926,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; - SQueuedMsg *queueMsg = rpcMsg->handle; + SMnodeMsg *queueMsg = rpcMsg->handle; queueMsg->received++; SChildTableObj *pTable = queueMsg->ahandle; @@ -1974,7 +1974,7 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; - SQueuedMsg *queueMsg = rpcMsg->handle; + SMnodeMsg *queueMsg = rpcMsg->handle; queueMsg->received++; SChildTableObj *pTable = queueMsg->ahandle; @@ -2020,7 +2020,7 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { mTrace("alter table rsp received, handle:%p code:%s", rpcMsg->handle, tstrerror(rpcMsg->code)); } -static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { +static void mgmtProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { SCMMultiTableInfoMsg *pInfo = pMsg->pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); @@ -2207,7 +2207,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, return numOfRows; } -static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { +static void mgmtProcessAlterTableMsg(SMnodeMsg *pMsg) { SCMAlterTableMsg *pAlter = pMsg->pCont; mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mnodeUser.c similarity index 96% rename from src/mnode/src/mgmtUser.c rename to src/mnode/src/mnodeUser.c index 62a98c4170..e346d804c2 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mnodeUser.c @@ -34,9 +34,9 @@ static void * tsUserSdb = NULL; static int32_t tsUserUpdateSize = 0; static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); -static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); -static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); +static void mgmtProcessCreateUserMsg(SMnodeMsg *pMsg); +static void mgmtProcessAlterUserMsg(SMnodeMsg *pMsg); +static void mgmtProcessDropUserMsg(SMnodeMsg *pMsg); static void mgmtProcessAuthMsg(SRpcMsg *rpcMsg); static int32_t mgmtUserActionDestroy(SSdbOper *pOper) { @@ -139,8 +139,8 @@ int32_t mgmtInitUsers() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_USER, mgmtProcessCreateUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_USER, mgmtProcessAlterUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_USER, mgmtProcessDropUserMsg); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_AUTH, mgmtProcessAuthMsg); mTrace("table:%s, hash is created", tableDesc.tableName); @@ -344,7 +344,7 @@ SUserObj *mgmtGetUserFromConn(void *pConn) { } } -static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) { +static void mgmtProcessCreateUserMsg(SMnodeMsg *pMsg) { int32_t code; SUserObj *pOperUser = pMsg->pUser; @@ -362,7 +362,7 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, code); } -static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { +static void mgmtProcessAlterUserMsg(SMnodeMsg *pMsg) { int32_t code; SUserObj *pOperUser = pMsg->pUser; @@ -457,7 +457,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { mgmtDecUserRef(pUser); } -static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { +static void mgmtProcessDropUserMsg(SMnodeMsg *pMsg) { int32_t code; SUserObj *pOperUser = pMsg->pUser; diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mnodeVgroup.c similarity index 98% rename from src/mnode/src/mgmtVgroup.c rename to src/mnode/src/mnodeVgroup.c index 960863d665..ffc5487a42 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -220,8 +220,8 @@ int32_t mgmtInitVgroups() { return -1; } - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); @@ -297,7 +297,7 @@ void *mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup) { return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup); } -void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { +void mgmtCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) { SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; @@ -617,7 +617,7 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; - SQueuedMsg *queueMsg = rpcMsg->handle; + SMnodeMsg *queueMsg = rpcMsg->handle; queueMsg->received++; if (rpcMsg->code == TSDB_CODE_SUCCESS) { queueMsg->code = rpcMsg->code; @@ -632,7 +632,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (queueMsg->received != queueMsg->expected) return; if (queueMsg->received == queueMsg->successed) { - SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); + SMnodeMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); mgmtAddToShellQueue(newMsg); } else { SSdbOper oper = { @@ -684,7 +684,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { mTrace("drop vnode rsp is received, handle:%p", rpcMsg->handle); if (rpcMsg->handle == NULL) return; - SQueuedMsg *queueMsg = rpcMsg->handle; + SMnodeMsg *queueMsg = rpcMsg->handle; queueMsg->received++; if (rpcMsg->code == TSDB_CODE_SUCCESS) { queueMsg->code = rpcMsg->code; @@ -708,7 +708,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { code = TSDB_CODE_SDB_ERROR; } - SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); + SMnodeMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); mgmtAddToShellQueue(newMsg); queueMsg->pCont = NULL; diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c new file mode 100644 index 0000000000..0fb4cf3d49 --- /dev/null +++ b/src/mnode/src/mnodeWrite.c @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosdef.h" +#include "tsched.h" +#include "tbalance.h" +#include "tgrant.h" +#include "ttimer.h" +#include "tglobal.h" +#include "dnode.h" +#include "mgmtDef.h" +#include "mgmtInt.h" +#include "mgmtServer.h" +#include "mgmtAcct.h" +#include "mgmtDnode.h" +#include "mgmtMnode.h" +#include "mgmtDb.h" +#include "mgmtSdb.h" +#include "mgmtVgroup.h" +#include "mgmtUser.h" +#include "mgmtTable.h" +#include "mgmtShell.h" + +static void (*tsMnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *); + +void mnodeAddWriteMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *pMsg)) { + tsMnodeProcessWriteMsgFp[msgType] = fp; +} + +int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { + SRpcMsg *rpcMsg = &pMsg->rpcMsg; + if (rpcMsg->pCont == NULL) { + mError("%p, msg:%s content is null", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_INVALID_MSG_LEN; + } + + if (!sdbIsMaster()) { + SMnodeRsp *rpcRsp = &pMsg->rpcRsp; + SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet)); + mgmtGetMnodeIpSetForShell(ipSet); + rpcRsp->rsp = ipSet; + rpcRsp->len = sizeof(SRpcIpSet); + + mTrace("%p, msg:%s will be redireced, inUse:%d", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], ipSet->inUse); + for (int32_t i = 0; i < ipSet->numOfIps; ++i) { + mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); + } + + return TSDB_CODE_REDIRECT; + } + + if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { + mError("%p, msg:%s not processed, grant time expired", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_GRANT_EXPIRED; + } + + if (tsMnodeProcessReadMsgFp[rpcMsg->msgType] == NULL) { + mError("%p, msg:%s not processed, no handle exist", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_MSG_NOT_PROCESSED; + } + + if (!mnodeInitMsg(pMsg)) { + mError("%p, msg:%s not processed, reason:%s", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], tstrerror(terrno)); + return terrno; + } + + if (!pMsg->pUser->writeAuth) { + mError("%p, msg:%s not processed, no rights", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + return TSDB_CODE_NO_RIGHTS; + } + + return (*tsMnodeProcessWriteMsgFp[rpcMsg->msgType])(pMsg); +} + +static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) { + mgmtAddToShellQueue(param); +} + +void mgmtDealyedAddToShellQueue(SMnodeMsg *queuedMsg) { + void *unUsed = NULL; + taosTmrReset(mgmtDoDealyedAddToShellQueue, 300, queuedMsg, tsMgmtTmr, &unUsed); +} \ No newline at end of file -- GitLab