提交 3b54b745 编写于 作者: S Shengliang Guan

[TD-335] rename mgmt to mnode

上级 406e170f
...@@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(taosd ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4) TARGET_LINK_LIBRARIES(taosd taos_static monitor http mqtt tsdb twal vnode cJson lz4)
IF (TD_ACCOUNT) IF (TD_ACCOUNT)
TARGET_LINK_LIBRARIES(taosd account) TARGET_LINK_LIBRARIES(taosd account)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MREAD_H
#define TDENGINE_DNODE_MREAD_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitMnodeRead();
void dnodeCleanupMnodeRead();
void dnodeDispatchToMnodeReadQueue(SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MWRITE_H
#define TDENGINE_DNODE_MWRITE_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitMnodeWrite();
void dnodeCleanupMnodeWrite();
void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif
...@@ -32,6 +32,8 @@ void* dnodeGetVnodeWal(void *pVnode); ...@@ -32,6 +32,8 @@ void* dnodeGetVnodeWal(void *pVnode);
void* dnodeGetVnodeTsdb(void *pVnode); void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode); void dnodeReleaseVnode(void *pVnode);
void dnodeSendRediretMsg(SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MMGMT_H
#define TDENGINE_DNODE_MMGMT_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitMnodeMgmt();
void dnodeCleanupMnodeMgmt();
void dnodeDispatchToMnodeMgmtQueue(SRpcMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "trpc.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeVMgmt.h"
#include "dnodeMRead.h"
typedef struct {
pthread_t thread;
int32_t workerId;
} SMReadWorker;
typedef struct {
int32_t num;
SMReadWorker *readWorker;
} SMReadWorkerPool;
static SMReadWorkerPool tsMReadPool;
static taos_qset tsMReadQset;
static taos_queue tsMReadQueue;
static void *dnodeProcessMnodeReadQueue(void *param);
int32_t dnodeInitMnodeRead() {
tsMReadQset = taosOpenQset();
tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2;
tsMReadPool.num = MAX(2, tsMReadPool.num);
tsMReadPool.num = MIN(4, tsMReadPool.num);
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num);
if (tsMReadPool.readWorker == NULL) return -1;
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i;
}
dPrint("dnode mread is opened");
return 0;
}
void dnodeCleanupMnodeRead() {
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMReadQset);
}
}
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
}
taosCloseQset(tsMReadQset);
free(tsMReadPool.readWorker);
dPrint("dnode mread is closed");
}
int32_t dnodeAllocateMnodeRqueue() {
tsMReadQueue = taosOpenQueue();
if (tsMReadQueue == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL);
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeReadQueue, pWorker) != 0) {
dError("failed to create thread to process mread queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
dTrace("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num);
}
dTrace("dnode mread queue:%p is allocated", tsMReadQueue);
return TSDB_CODE_SUCCESS;
}
void dnodeFreeMnodeRqueue() {
taosCloseQueue(tsMReadQueue);
tsMReadQueue = NULL;
}
void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMReadQueue == NULL) {
dnodeSendRediretMsg(pMsg);
return;
}
SMnodeMsg *pRead = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
pRead->rpcMsg = *pMsg;
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
}
static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len,
.code = pRead->rspRet.code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
}
static void *dnodeProcessMnodeReadQueue(void *param) {
SMnodeMsg *pReadMsg;
int32_t type;
void * unUsed;
while (1) {
if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) {
dTrace("dnodeProcessMnodeReadQueue: got no message from qset, exiting...");
break;
}
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessRead(pReadMsg);
dnodeSendRpcMnodeReadRsp(pReadMsg, code);
taosFreeQitem(pReadMsg);
}
return NULL;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "trpc.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeVMgmt.h"
#include "dnodeMWrite.h"
typedef struct {
pthread_t thread;
int32_t workerId;
} SMWriteWorker;
typedef struct {
int32_t num;
SMWriteWorker *writeWorker;
} SMWriteWorkerPool;
static SMWriteWorkerPool tsMWritePool;
static taos_qset tsMWriteQset;
static taos_queue tsMWriteQueue;
static void *dnodeProcessMnodeWriteQueue(void *param);
int32_t dnodeInitMnodeWrite() {
tsMWriteQset = taosOpenQset();
tsMWritePool.num = 1;
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num);
if (tsMWritePool.writeWorker == NULL) return -1;
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i;
}
dPrint("dnode mwrite is opened");
return 0;
}
void dnodeCleanupMnodeWrite() {
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMWriteQset);
}
}
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
}
dPrint("dnode mwrite is closed");
}
int32_t dnodeAllocateMnodeRqueue() {
tsMWriteQueue = taosOpenQueue();
if (tsMWriteQueue == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL);
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeWriteQueue, pWorker) != 0) {
dError("failed to create thread to process mwrite queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
dTrace("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num);
}
dTrace("dnode mwrite queue:%p is allocated", tsMWriteQueue);
return TSDB_CODE_SUCCESS;
}
void dnodeFreeMnodeRqueue() {
taosCloseQueue(tsMWriteQueue);
tsMWriteQueue = NULL;
}
void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
dnodeSendRediretMsg(pMsg);
return;
}
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
pWrite->rpcMsg = *pMsg;
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
static void dnodeSendRpcMnodeWriteRsp(SMnodeMsg *pWrite, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len,
.code = pWrite->rspRet.code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pWrite->rpcMsg.pCont);
}
static void *dnodeProcessMnodeWriteQueue(void *param) {
SMnodeMsg *pWriteMsg;
int32_t type;
void * unUsed;
while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWriteMsg, &unUsed) == 0) {
dTrace("dnodeProcessMnodeWriteQueue: got no message from qset, exiting...");
break;
}
dTrace("%p, msg:%s will be processed", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWriteMsg);
dnodeSendRpcMnodeWriteRsp(pWriteMsg, code);
taosFreeQitem(pWriteMsg);
}
return NULL;
}
...@@ -21,12 +21,15 @@ ...@@ -21,12 +21,15 @@
#include "tglobal.h" #include "tglobal.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMgmt.h" #include "dnodeVMgmt.h"
#include "dnodePeer.h" #include "dnodePeer.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
#include "dnodeShell.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeMMgmt.h"
#include "dnodeShell.h"
static int32_t dnodeInitStorage(); static int32_t dnodeInitStorage();
static void dnodeCleanupStorage(); static void dnodeCleanupStorage();
...@@ -65,8 +68,11 @@ int32_t dnodeInitSystem() { ...@@ -65,8 +68,11 @@ int32_t dnodeInitSystem() {
dPrint("start to initialize TDengine on %s", tsLocalEp); dPrint("start to initialize TDengine on %s", tsLocalEp);
if (dnodeInitStorage() != 0) return -1; if (dnodeInitStorage() != 0) return -1;
if (dnodeInitRead() != 0) return -1; if (dnodeInitVnodeRead() != 0) return -1;
if (dnodeInitWrite() != 0) return -1; if (dnodeInitVnodeWrite() != 0) return -1;
if (dnodeInitMnodeRead() != 0) return -1;
if (dnodeInitMnodeWrite() != 0) return -1;
if (dnodeInitMnodeMgmt() != 0) return -1;
if (dnodeInitClient() != 0) return -1; if (dnodeInitClient() != 0) return -1;
if (dnodeInitServer() != 0) return -1; if (dnodeInitServer() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1; if (dnodeInitMgmt() != 0) return -1;
...@@ -89,8 +95,11 @@ void dnodeCleanUpSystem() { ...@@ -89,8 +95,11 @@ void dnodeCleanUpSystem() {
dnodeCleanupMgmt(); dnodeCleanupMgmt();
dnodeCleanupServer(); dnodeCleanupServer();
dnodeCleanupClient(); dnodeCleanupClient();
dnodeCleanupWrite(); dnodeCleanupMnodeMgmt();
dnodeCleanupRead(); dnodeCleanupMnodeWrite();
dnodeCleanupMnodeRead();
dnodeCleanupVnodeWrite();
dnodeCleanupVnodeRead();
dnodeCleanupStorage(); dnodeCleanupStorage();
taos_cleanup(); taos_cleanup();
taosCloseLog(); taosCloseLog();
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
#include "vnode.h" #include "vnode.h"
#include "mnode.h" #include "mnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMgmt.h" #include "dnodeVMgmt.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeModule.h" #include "dnodeModule.h"
...@@ -274,13 +274,16 @@ void dnodeUpdateIpSet(SRpcIpSet *pIpSet) { ...@@ -274,13 +274,16 @@ void dnodeUpdateIpSet(SRpcIpSet *pIpSet) {
tsMnodeIpSet = *pIpSet; tsMnodeIpSet = *pIpSet;
} }
void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) { void dnodeGetMnodeDnodeIpSet(void *ipSetRaw, bool encode) {
SRpcIpSet *ipSet = ipSetRaw; SRpcIpSet *ipSet = ipSetRaw;
ipSet->numOfIps = tsMnodeInfos.nodeNum; ipSet->numOfIps = tsMnodeInfos.nodeNum;
ipSet->inUse = tsMnodeInfos.inUse; ipSet->inUse = tsMnodeInfos.inUse;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) { for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) {
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]); taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]);
ipSet->port[i] += TSDB_PORT_DNODEDNODE; ipSet->port[i] += TSDB_PORT_DNODEDNODE;
if (encode) {
ipSet->port[i] = htons(ipSet->port[i]);
}
} }
} }
...@@ -590,3 +593,20 @@ int32_t dnodeGetDnodeId() { ...@@ -590,3 +593,20 @@ int32_t dnodeGetDnodeId() {
return tsDnodeCfg.dnodeId; return tsDnodeCfg.dnodeId;
} }
void dnodeSendRediretMsg(SRpcMsg *rpcMsg) {
SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcIpSet ipSet = {0};
dnodeGetMnodeDnodeIpSet(&ipSet);
dTrace("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[rpcMsg->msgType],
taosIpStr(connInfo.clientIp), connInfo.user, ipSet.numOfIps, ipSet.inUse);
for (int i = 0; i < ipSet.numOfIps; ++i) {
dTrace("mnode index:%d %s:%d", i, ipSet.fqdn[i], ipSet.port[i]);
ipSet.port[i] = htons(ipSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "trpc.h"
#include "twal.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeVMgmt.h"
#include "dnodeMWrite.h"
typedef struct {
pthread_t thread;
int32_t workerId;
} SMMgmtWorker;
typedef struct {
int32_t num;
SMMgmtWorker *mgmtWorker;
} SMMgmtWorkerPool;
static SMMgmtWorkerPool tsMMgmtPool;
static taos_qset tsMMgmtQset;
static taos_queue tsMMgmtQueue;
static void *dnodeProcessMnodeMgmtQueue(void *param);
int32_t dnodeInitMnodeMgmt() {
tsMMgmtQset = taosOpenQset();
tsMMgmtPool.num = 1;
tsMMgmtPool.mgmtWorker = (SMMgmtWorker *)calloc(sizeof(SMMgmtWorker), tsMMgmtPool.num);
if (tsMMgmtPool.mgmtWorker == NULL) return -1;
for (int32_t i = 0; i < tsMMgmtPool.num; ++i) {
SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i;
pWorker->workerId = i;
}
dPrint("dnode mmgmt is opened");
return 0;
}
void dnodeCleanupMnodeMgmt() {
for (int32_t i = 0; i < tsMMgmtPool.num; ++i) {
SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMMgmtQset);
}
}
for (int32_t i = 0; i < tsMMgmtPool.num; ++i) {
SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
}
dPrint("dnode mmgmt is closed");
}
int32_t dnodeAllocateMnodeMqueue() {
tsMMgmtQueue = taosOpenQueue();
if (tsMMgmtQueue == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
taosAddIntoQset(tsMMgmtQset, tsMMgmtQueue, NULL);
for (int32_t i = 0; i < tsMMgmtPool.num; ++i) {
SMMgmtWorker *pWorker = tsMMgmtPool.mgmtWorker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodeMgmtQueue, pWorker) != 0) {
dError("failed to create thread to process mmgmt queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
dTrace("dnode mmgmt worker:%d is launched, total:%d", pWorker->workerId, tsMMgmtPool.num);
}
dTrace("dnode mmgmt queue:%p is allocated", tsMMgmtQueue);
return TSDB_CODE_SUCCESS;
}
void dnodeFreeMnodeRqueue() {
taosCloseQueue(tsMMgmtQueue);
tsMMgmtQueue = NULL;
}
void dnodeDispatchToMnodeMgmtQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMMgmtQueue == NULL) {
dnodeSendRediretMsg(pMsg);
return;
}
SMnodeMsg *pMgmt = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
pMgmt->rpcMsg = *pMsg;
taosWriteQitem(tsMMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
}
static void dnodeSendRpcMnodeMgmtRsp(SMnodeMsg *pMgmt, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
SRpcMsg rpcRsp = {
.handle = pMgmt->rpcMsg.handle,
.pCont = pMgmt->rspRet.rsp,
.contLen = pMgmt->rspRet.len,
.code = pMgmt->rspRet.code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMgmt->rpcMsg.pCont);
}
static void *dnodeProcessMnodeMgmtQueue(void *param) {
SMnodeMsg *pMgmtMsg;
int32_t type;
void * unUsed;
while (1) {
if (taosReadQitemFromQset(tsMMgmtQset, &type, (void **)&pMgmtMsg, &unUsed) == 0) {
dTrace("dnodeProcessMnodeMgmtQueue: got no message from qset, exiting...");
break;
}
dTrace("%p, msg:%s will be processed", pMgmtMsg->rpcMsg.ahandle, taosMsg[pMgmtMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessMgmt(pMgmtMsg);
dnodeSendRpcMnodeMgmtRsp(pMgmtMsg, code);
taosFreeQitem(pMgmtMsg);
}
return NULL;
}
...@@ -25,8 +25,10 @@ ...@@ -25,8 +25,10 @@
#include "trpc.h" #include "trpc.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMgmt.h" #include "dnodeVMgmt.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "mnode.h" #include "mnode.h"
extern void dnodeUpdateIpSet(SRpcIpSet *pIpSet); extern void dnodeUpdateIpSet(SRpcIpSet *pIpSet);
...@@ -48,11 +50,11 @@ int32_t dnodeInitServer() { ...@@ -48,11 +50,11 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToDnodeMgmt;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToDnodeMgmt;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodeReadQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodeReadQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMnodeReadQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMnodeWriteQueue;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -167,6 +169,6 @@ void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { ...@@ -167,6 +169,6 @@ void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcIpSet ipSet = {0}; SRpcIpSet ipSet = {0};
dnodeGetMnodeDnodeIpSet(&ipSet); dnodeGetMnodeDnodeIpSet(&ipSet, false);
rpcSendRecv(tsDnodeClientRpc, &ipSet, rpcMsg, rpcRsp); rpcSendRecv(tsDnodeClientRpc, &ipSet, rpcMsg, rpcRsp);
} }
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeShell.h" #include "dnodeShell.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
...@@ -43,35 +45,35 @@ int32_t dnodeInitShell() { ...@@ -43,35 +45,35 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mgmtProcessMsgFromShell;
// the following message shall be treated as mnode query // the following message shall be treated as mnode query
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= mgmtProcessMsgFromShell; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeReadQueue;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include "twal.h" #include "twal.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMgmt.h" #include "dnodeVMgmt.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
#include "vnode.h" #include "vnode.h"
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include "tdataformat.h" #include "tdataformat.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMgmt.h" #include "dnodeVMgmt.h"
typedef struct { typedef struct {
taos_qall qall; taos_qall qall;
......
...@@ -45,7 +45,7 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); ...@@ -45,7 +45,7 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
bool dnodeIsFirstDeploy(); bool dnodeIsFirstDeploy();
char *dnodeGetMnodeMasterEp(); char *dnodeGetMnodeMasterEp();
void dnodeGetMnodeDnodeIpSet(void *ipSet); void dnodeGetMnodeDnodeIpSet(void *ipSet, bool encode);
void * dnodeGetMnodeInfos(); void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
......
...@@ -20,17 +20,38 @@ ...@@ -20,17 +20,38 @@
extern "C" { extern "C" {
#endif #endif
typedef struct {
int len;
int code;
void *rsp;
} SMnodeRsp;
typedef struct {
SRpcMsg rpcMsg;
SMnodeRsp rpcRsp;
} SMnodeMsg;
SMnodeMsg *mnodeCreateMsg(SRpcMsg *rpcMsg);
bool mnodeInitMsg(SMnodeMsg *pMsg);
void mnodeRleaseMsg(SMnodeMsg *pMsg);
int32_t mgmtInitSystem(); int32_t mgmtInitSystem();
int32_t mgmtStartSystem(); int32_t mgmtStartSystem();
void mgmtCleanUpSystem(); void mgmtCleanUpSystem();
void mgmtStopSystem(); void mgmtStopSystem();
void sdbUpdateSync(); void sdbUpdateSync();
void* mnodeGetRqueue(void *);
void* mnodeGetWqueue(int32_t vgId);
bool mnodeIsRunning();
int32_t mnodeProcessRead(SMnodeMsg *pMsg);
int32_t mnodeProcessWrite(SMnodeMsg *pMsg);
int32_t mnodeProcessMgmt(SMnodeMsg *pMsg);
int32_t mgmtRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mgmtRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg); void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg); void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -251,7 +251,7 @@ typedef struct { ...@@ -251,7 +251,7 @@ typedef struct {
SDbObj *pDb; SDbObj *pDb;
SVgObj *pVgroup; SVgObj *pVgroup;
STableObj *pTable; STableObj *pTable;
} SQueuedMsg; } SMnodeMsg;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -42,7 +42,9 @@ void mgmtIncMnodeRef(struct SMnodeObj *pMnode); ...@@ -42,7 +42,9 @@ void mgmtIncMnodeRef(struct SMnodeObj *pMnode);
void mgmtDecMnodeRef(struct SMnodeObj *pMnode); void mgmtDecMnodeRef(struct SMnodeObj *pMnode);
char * mgmtGetMnodeRoleStr(); char * mgmtGetMnodeRoleStr();
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet); void mgmtGetMnodeIpSetForPeer(SRpcIpSet *ipSet);
void mgmtGetMnodeIpSetForShell(SRpcIpSet *ipSet);
void mgmtGetMnodeInfos(void *mnodes); void mgmtGetMnodeInfos(void *mnodes);
void mgmtUpdateMnodeIpSet(); void mgmtUpdateMnodeIpSet();
......
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
extern "C" { extern "C" {
#endif #endif
int32_t mgmtInitServer(); int32_t mnodeInitMgmt();
void mgmtCleanupServer(); void mgmtCleanupMgmt();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,15 +23,15 @@ extern "C" { ...@@ -23,15 +23,15 @@ extern "C" {
int32_t mgmtInitShell(); int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SQueuedMsg *queuedMsg)); void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *queuedMsg));
typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
void mgmtAddToShellQueue(SQueuedMsg *queuedMsg); void mgmtAddToShellQueue(SMnodeMsg *queuedMsg);
void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg); void mgmtDealyedAddToShellQueue(SMnodeMsg *queuedMsg);
void mgmtSendSimpleResp(void *thandle, int32_t code); void mgmtSendSimpleResp(void *thandle, int32_t code);
bool mgmtCheckQhandle(uint64_t qhandle); bool mgmtCheckQhandle(uint64_t qhandle);
...@@ -39,8 +39,8 @@ void *mgmtSaveQhandle(void *qhandle, int32_t size); ...@@ -39,8 +39,8 @@ void *mgmtSaveQhandle(void *qhandle, int32_t size);
void mgmtFreeQhandle(void *qhandle, bool forceRemove); void mgmtFreeQhandle(void *qhandle, bool forceRemove);
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg); void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg);
void *mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg); void *mgmtCloneQueuedMsg(SMnodeMsg *pSrcMsg);
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); void mgmtFreeQueuedMsg(SMnodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -35,7 +35,7 @@ void * mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup); ...@@ -35,7 +35,7 @@ void * mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup);
void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *dnodeId, SVnodeLoad *pVload); void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *dnodeId, SVnodeLoad *pVload);
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb); void mgmtCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
......
...@@ -41,13 +41,13 @@ static void * tsDbSdb = NULL; ...@@ -41,13 +41,13 @@ static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize; static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static void mgmtDropDb(SQueuedMsg *newMsg); static void mgmtDropDb(SMnodeMsg *newMsg);
static int32_t mgmtSetDbDropping(SDbObj *pDb); static int32_t mgmtSetDbDropping(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateDbMsg(SMnodeMsg *pMsg);
static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterDbMsg(SMnodeMsg *pMsg);
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg); static void mgmtProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mgmtDbActionDestroy(SSdbOper *pOper) { static int32_t mgmtDbActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj); tfree(pOper->pObj);
...@@ -150,8 +150,8 @@ int32_t mgmtInitDbs() { ...@@ -150,8 +150,8 @@ int32_t mgmtInitDbs() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mgmtProcessCreateDbMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mgmtProcessCreateDbMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mgmtProcessAlterDbMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mgmtProcessAlterDbMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mgmtProcessDropDbMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mgmtProcessDropDbMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DB, mgmtGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mgmtGetDbMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mgmtRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mgmtRetrieveDbs);
mTrace("table:dbs table is created"); mTrace("table:dbs table is created");
return 0; return 0;
...@@ -748,7 +748,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) { ...@@ -748,7 +748,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) {
return code; return code;
} }
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateDbMsg(SMnodeMsg *pMsg) {
SCMCreateDbMsg *pCreate = pMsg->pCont; SCMCreateDbMsg *pCreate = pMsg->pCont;
pCreate->maxTables = htonl(pCreate->maxTables); pCreate->maxTables = htonl(pCreate->maxTables);
...@@ -935,7 +935,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -935,7 +935,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { static void mgmtProcessAlterDbMsg(SMnodeMsg *pMsg) {
SCMAlterDbMsg *pAlter = pMsg->pCont; SCMAlterDbMsg *pAlter = pMsg->pCont;
mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle); mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle);
...@@ -963,7 +963,7 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { ...@@ -963,7 +963,7 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
} }
static void mgmtDropDb(SQueuedMsg *pMsg) { static void mgmtDropDb(SMnodeMsg *pMsg) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
mPrint("db:%s, drop db from sdb", pDb->name); mPrint("db:%s, drop db from sdb", pDb->name);
...@@ -980,7 +980,7 @@ static void mgmtDropDb(SQueuedMsg *pMsg) { ...@@ -980,7 +980,7 @@ static void mgmtDropDb(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
} }
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropDbMsg(SMnodeMsg *pMsg) {
SCMDropDbMsg *pDrop = pMsg->pCont; SCMDropDbMsg *pDrop = pMsg->pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle); mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle);
...@@ -1022,7 +1022,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { ...@@ -1022,7 +1022,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
SVgObj *pVgroup = pMsg->pDb->pHead; SVgObj *pVgroup = pMsg->pDb->pHead;
if (pVgroup != NULL) { if (pVgroup != NULL) {
mPrint("vgId:%d, will be dropped", pVgroup->vgId); mPrint("vgId:%d, will be dropped", pVgroup->vgId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup; newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes; newMsg->expected = pVgroup->numOfVnodes;
mgmtDropVgroup(pVgroup, newMsg); mgmtDropVgroup(pVgroup, newMsg);
......
...@@ -42,9 +42,9 @@ extern void * tsMnodeSdb; ...@@ -42,9 +42,9 @@ extern void * tsMnodeSdb;
extern void * tsVgroupSdb; extern void * tsVgroupSdb;
static int32_t mgmtCreateDnode(char *ep); static int32_t mgmtCreateDnode(char *ep);
static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateDnodeMsg(SMnodeMsg *pMsg);
static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg); static void mgmtProcessDropDnodeMsg(SMnodeMsg *pMsg);
static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg); static void mgmtProcessCfgDnodeMsg(SMnodeMsg *pMsg);
static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ; static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ;
static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg);
static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
...@@ -153,14 +153,14 @@ int32_t mgmtInitDnodes() { ...@@ -153,14 +153,14 @@ int32_t mgmtInitDnodes() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DNODE, mgmtGetDnodeMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DNODE, mgmtGetDnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, mgmtRetrieveDnodes); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, mgmtRetrieveDnodes);
mTrace("table:dnodes table is created"); mTrace("table:dnodes table is created");
return 0; return 0;
...@@ -236,7 +236,7 @@ void mgmtUpdateDnode(SDnodeObj *pDnode) { ...@@ -236,7 +236,7 @@ void mgmtUpdateDnode(SDnodeObj *pDnode) {
sdbUpdateRow(&oper); sdbUpdateRow(&oper);
} }
void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { void mgmtProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont; SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont;
...@@ -451,7 +451,7 @@ static int32_t mgmtDropDnodeByEp(char *ep) { ...@@ -451,7 +451,7 @@ static int32_t mgmtDropDnodeByEp(char *ep) {
#endif #endif
} }
static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMCreateDnodeMsg *pCreate = pMsg->pCont; SCMCreateDnodeMsg *pCreate = pMsg->pCont;
...@@ -472,7 +472,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { ...@@ -472,7 +472,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) {
} }
static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropDnodeMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMDropDnodeMsg *pDrop = pMsg->pCont; SCMDropDnodeMsg *pDrop = pMsg->pCont;
......
...@@ -35,9 +35,13 @@ ...@@ -35,9 +35,13 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtShell.h" #include "mgmtShell.h"
extern void *tsMgmtTmr; static void *tsMgmtTmr;
static bool tsMgmtIsRunning = false; static bool tsMgmtIsRunning = false;
static void mnodeInitTimer();
static void mnodeCleanupTimer();
static bool mnodeNeedStart() ;
int32_t mgmtStartSystem() { int32_t mgmtStartSystem() {
if (tsMgmtIsRunning) { if (tsMgmtIsRunning) {
mPrint("TDengine mgmt module already started..."); mPrint("TDengine mgmt module already started...");
...@@ -99,7 +103,7 @@ int32_t mgmtStartSystem() { ...@@ -99,7 +103,7 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (mgmtInitServer() < 0) { if (mnodeInitMgmt() < 0) {
return -1; return -1;
} }
...@@ -112,16 +116,11 @@ int32_t mgmtStartSystem() { ...@@ -112,16 +116,11 @@ int32_t mgmtStartSystem() {
} }
int32_t mgmtInitSystem() { int32_t mgmtInitSystem() {
if (mgmtInitShell() != 0) { mnodeInitTimer();
mError("failed to init shell"); mnodeInitRead();
return -1; mnodeInitWrite();
}
struct stat dirstat;
bool fileExist = (stat(tsMnodeDir, &dirstat) == 0);
bool asMaster = (strcmp(tsFirst, tsLocalEp) == 0);
if (asMaster || fileExist) { if (mnodeNeedStart()) {
if (mgmtStartSystem() != 0) { if (mgmtStartSystem() != 0) {
return -1; return -1;
} }
...@@ -133,8 +132,12 @@ int32_t mgmtInitSystem() { ...@@ -133,8 +132,12 @@ int32_t mgmtInitSystem() {
void mgmtCleanUpSystem() { void mgmtCleanUpSystem() {
mPrint("starting to clean up mgmt"); mPrint("starting to clean up mgmt");
tsMgmtIsRunning = false; tsMgmtIsRunning = false;
mgmtCleanUpShell();
mgmtCleanupServer(); mnodeCleanupTimer();
mnodeCleanupRead();
mnodeCleanupWrite();
mgmtCleanupMgmt();
grantCleanUp(); grantCleanUp();
balanceCleanUp(); balanceCleanUp();
sdbCleanUp(); sdbCleanUp();
...@@ -153,9 +156,43 @@ void mgmtStopSystem() { ...@@ -153,9 +156,43 @@ void mgmtStopSystem() {
mTrace("it is a master mgmt node, it could not be stopped"); mTrace("it is a master mgmt node, it could not be stopped");
return; return;
} }
mgmtCleanUpSystem(); mgmtCleanUpSystem();
mPrint("mgmt file is removed"); mPrint("mgmt file is removed");
remove(tsMnodeDir); remove(tsMnodeDir);
} }
void* mnodeGetWqueue(int32_t vgId) {
}
static void mnodeInitTimer() {
if (tsMgmtTmr != NULL) {
tsMgmtTmr = taosTmrInit((tsMaxShellConns)*3, 200, 3600000, "MND");
}
}
static void mnodeCleanupTimer() {
if (tsMgmtTmr != NULL) {
taosTmrCleanUp(tsMgmtTmr);
tsMgmtTmr = NULL;
}
}
static bool mnodeNeedStart() {
struct stat dirstat;
bool fileExist = (stat(tsMnodeDir, &dirstat) == 0);
bool asMaster = (strcmp(tsFirst, tsLocalEp) == 0);
if (asMaster || fileExist) {
return true;
}
return false;
}
...@@ -34,73 +34,38 @@ ...@@ -34,73 +34,38 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); static void (*tsMnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *);
static void *tsMgmtServerQhandle = NULL;
int32_t mgmtInitServer() { void mnodeAddMgmtMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *pMsg)) {
tsMnodeProcessMgmtMsgFp[msgType] = fp;
tsMgmtServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS");
mPrint("server connection to dnode is opened");
return 0;
} }
void mgmtCleanupServer() { int32_t mnodeProcessMgmt(SMnodeMsg *pMsg) {
if (tsMgmtServerQhandle) { SRpcMsg *rpcMsg = &pMsg->rpcMsg;
taosCleanUpScheduler(tsMgmtServerQhandle); if (rpcMsg->pCont == NULL) {
tsMgmtServerQhandle = NULL; mError("%p, msg:%s content is null", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_INVALID_MSG_LEN;
} }
}
void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { if (!sdbIsMaster()) {
mgmtProcessDnodeMsgFp[msgType] = fp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
} SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet));
mgmtGetMnodeIpSetForPeer(ipSet);
static void mgmtProcessRequestFromDnode(SSchedMsg *sched) { rpcRsp->rsp = ipSet;
SRpcMsg *pMsg = sched->msg; rpcRsp->len = sizeof(SRpcIpSet);
(*mgmtProcessDnodeMsgFp[pMsg->msgType])(pMsg);
rpcFreeCont(pMsg->pCont);
free(pMsg);
}
static void mgmtAddToServerQueue(SRpcMsg *pMsg) { mTrace("%p, msg:%s will be redireced, inUse:%d", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], ipSet->inUse);
SSchedMsg schedMsg; for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
schedMsg.msg = pMsg; mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
schedMsg.fp = mgmtProcessRequestFromDnode; }
taosScheduleTask(tsMgmtServerQhandle, &schedMsg);
}
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg) { return TSDB_CODE_REDIRECT;
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType] == NULL) {
mError("%s is not processed in mnode", taosMsg[rpcMsg->msgType]);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED);
rpcFreeCont(rpcMsg->pCont);
} }
if (rpcMsg->pCont == NULL) { if (tsMnodeProcessMgmtMsgFp[rpcMsg->msgType] == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); mError("%p, msg:%s not processed, no handle exist", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return; return TSDB_CODE_MSG_NOT_PROCESSED;
} }
if (!sdbIsMaster()) { return (*tsMnodeProcessMgmtMsgFp[rpcMsg->msgType])(rpcMsg, );
SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcIpSet ipSet = {0};
dnodeGetMnodeDnodeIpSet(&ipSet);
for (int i = 0; i < ipSet.numOfIps; ++i)
ipSet.port[i] = htons(ipSet.port[i]);
mTrace("conn from dnode ip:%s user:%s redirect msg, inUse:%d", taosIpStr(connInfo.clientIp), connInfo.user, ipSet.inUse);
for (int32_t i = 0; i < ipSet.numOfIps; ++i) {
mTrace("mnode index:%d %s:%d", i, ipSet.fqdn[i], htons(ipSet.port[i]));
}
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
return;
}
SRpcMsg *pMsg = malloc(sizeof(SRpcMsg));
memcpy(pMsg, rpcMsg, sizeof(SRpcMsg));
mgmtAddToServerQueue(pMsg);
} }
...@@ -154,8 +154,8 @@ int32_t mgmtInitMnodes() { ...@@ -154,8 +154,8 @@ int32_t mgmtInitMnodes() {
return -1; return -1;
} }
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes);
mTrace("table:mnodes table is created"); mTrace("table:mnodes table is created");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -672,7 +672,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn ...@@ -672,7 +672,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
return numOfRows; return numOfRows;
} }
void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { void mgmtProcessKillQueryMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
...@@ -696,7 +696,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { ...@@ -696,7 +696,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
mgmtDecUserRef(pUser); mgmtDecUserRef(pUser);
} }
void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { void mgmtProcessKillStreamMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
...@@ -720,7 +720,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { ...@@ -720,7 +720,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
mgmtDecUserRef(pUser); mgmtDecUserRef(pUser);
} }
void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { void mgmtProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
...@@ -745,12 +745,12 @@ void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { ...@@ -745,12 +745,12 @@ void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) {
} }
int32_t mgmtInitProfile() { int32_t mgmtInitProfile() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mgmtGetQueryMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mgmtGetQueryMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mgmtRetrieveQueries); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mgmtRetrieveQueries);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mgmtGetConnsMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mgmtGetConnsMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mgmtRetrieveConns); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mgmtRetrieveConns);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mgmtGetStreamMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mgmtGetStreamMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mgmtRetrieveStreams); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mgmtRetrieveStreams);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mgmtProcessKillQueryMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_QUERY, mgmtProcessKillQueryMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mgmtProcessKillStreamMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mgmtProcessKillStreamMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mgmtProcessKillConnectionMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mgmtProcessKillConnectionMsg);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "tbalance.h"
#include "tgrant.h"
#include "ttimer.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtInt.h"
#include "mgmtServer.h"
#include "mgmtAcct.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtDb.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
#include "mgmtUser.h"
#include "mgmtTable.h"
#include "mgmtShell.h"
static void (*tsMnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *);
void mnodeAddReadMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *pMsg)) {
tsMnodeProcessReadMsgFp[msgType] = fp;
}
int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SRpcMsg *rpcMsg = &pMsg->rpcMsg;
if (rpcMsg->pCont == NULL) {
mError("%p, msg:%s content is null", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_INVALID_MSG_LEN;
}
if (!sdbIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet));
mgmtGetMnodeIpSetForShell(ipSet);
rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet);
mTrace("%p, msg:%s will be redireced, inUse:%d", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
}
return TSDB_CODE_REDIRECT;
}
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mError("%p, msg:%s not processed, grant time expired", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_GRANT_EXPIRED;
}
if (tsMnodeProcessReadMsgFp[rpcMsg->msgType] == NULL) {
mError("%p, msg:%s not processed, no handle exist", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED;
}
if (!mnodeInitMsg(pMsg)) {
mError("%p, msg:%s not processed, reason:%s", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], tstrerror(terrno));
return terrno;
}
return (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg);
}
...@@ -41,142 +41,52 @@ ...@@ -41,142 +41,52 @@
typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static void mgmtProcessShowMsg(SMnodeMsg *queuedMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessRetrieveMsg(SMnodeMsg *queuedMsg);
static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessHeartBeatMsg(SMnodeMsg *queuedMsg);
static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); static void mgmtProcessConnectMsg(SMnodeMsg *queuedMsg);
static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg); static void mgmtProcessUseMsg(SMnodeMsg *queuedMsg);
static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg);
static void mgmtFreeShowObj(void *data); static void mgmtFreeShowObj(void *data);
void *tsMgmtTmr;
static void *tsMgmtTranQhandle = NULL;
static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0};
static void *tsQhandleCache = NULL; static void *tsQhandleCache = NULL;
static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowRetrieveFp tsMnodeShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
int32_t mgmtInitShell() { void mnodeInitShow() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_SHOW, mgmtProcessShowMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE, mgmtProcessRetrieveMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE, mgmtProcessRetrieveMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_HEARTBEAT, mgmtProcessHeartBeatMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg);
tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND");
tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT");
tsQhandleCache = taosCacheInitWithCb(tsMgmtTmr, 10, mgmtFreeShowObj); tsQhandleCache = taosCacheInitWithCb(tsMgmtTmr, 10, mgmtFreeShowObj);
return 0;
} }
void mgmtCleanUpShell() { void mnodeCleanUpShow() {
if (tsMgmtTmr != NULL){
taosTmrCleanUp(tsMgmtTmr);
tsMgmtTmr = NULL;
}
if (tsQhandleCache != NULL) { if (tsQhandleCache != NULL) {
taosCacheCleanup(tsQhandleCache); taosCacheCleanup(tsQhandleCache);
tsQhandleCache = NULL; tsQhandleCache = NULL;
} }
if (tsMgmtTranQhandle != NULL) {
taosCleanUpScheduler(tsMgmtTranQhandle);
tsMgmtTranQhandle = NULL;
}
} }
void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SQueuedMsg *queuedMsg)) { void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp) {
tsMgmtProcessShellMsgFp[showType] = fp; tsMnodeShowMetaFp[showType] = fp;
} }
void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp) { void mnodeAddShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) {
tsMgmtShowMetaFp[showType] = fp; tsMnodeShowRetrieveFp[msgType] = fp;
} }
void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { int32_t mnodeProcessRead(int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
tsMgmtShowRetrieveFp[msgType] = fp; if (vnodeProcessReadMsgFp[msgType] == NULL)
} return TSDB_CODE_MSG_NOT_PROCESSED;
void mgmtProcessTranRequest(SSchedMsg *sched) { if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING)
SQueuedMsg *queuedMsg = sched->msg; return TSDB_CODE_NOT_ACTIVE_VNODE;
(*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg);
mgmtFreeQueuedMsg(queuedMsg);
}
void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
SSchedMsg schedMsg;
schedMsg.msg = queuedMsg;
schedMsg.fp = mgmtProcessTranRequest;
taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
}
static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) {
mgmtAddToShellQueue(param);
}
void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
void *unUsed = NULL;
taosTmrReset(mgmtDoDealyedAddToShellQueue, 300, queuedMsg, tsMgmtTmr, &unUsed);
} }
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
mTrace("%p, msg:%s will be processed", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
if (rpcMsg->pCont == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
return;
}
if (!sdbIsMaster()) {
SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcIpSet ipSet = {0};
mgmtGetMnodeIpSet(&ipSet);
mTrace("conn from shell ip:%s user:%s redirect msg, inUse:%d", taosIpStr(connInfo.clientIp), connInfo.user, ipSet.inUse);
for (int32_t i = 0; i < ipSet.numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet.fqdn[i], htons(ipSet.port[i]));
}
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
return;
}
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) {
mgmtProcessUnSupportMsg(rpcMsg);
rpcFreeCont(rpcMsg->pCont);
return;
}
SQueuedMsg *pMsg = mgmtMallocQueuedMsg(rpcMsg);
if (pMsg == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (mgmtCheckMsgReadOnly(pMsg)) {
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg);
mgmtFreeQueuedMsg(pMsg);
} else {
if (!pMsg->pUser->writeAuth) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
mgmtFreeQueuedMsg(pMsg);
} else {
mgmtAddToShellQueue(pMsg);
}
}
}
char *mgmtGetShowTypeStr(int32_t showType) { char *mgmtGetShowTypeStr(int32_t showType) {
switch (showType) { switch (showType) {
...@@ -200,14 +110,14 @@ char *mgmtGetShowTypeStr(int32_t showType) { ...@@ -200,14 +110,14 @@ char *mgmtGetShowTypeStr(int32_t showType) {
} }
} }
static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { static void mgmtProcessShowMsg(SMnodeMsg *pMsg) {
SCMShowMsg *pShowMsg = pMsg->pCont; SCMShowMsg *pShowMsg = pMsg->pCont;
if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE);
return; return;
} }
if (!tsMgmtShowMetaFp[pShowMsg->type] || !tsMgmtShowRetrieveFp[pShowMsg->type]) { if (!tsMnodeShowMetaFp[pShowMsg->type] || !tsMnodeShowRetrieveFp[pShowMsg->type]) {
mError("show type:%s is not support", mgmtGetShowTypeStr(pShowMsg->type)); mError("show type:%s is not support", mgmtGetShowTypeStr(pShowMsg->type));
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OPS_NOT_SUPPORT);
return; return;
...@@ -232,7 +142,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { ...@@ -232,7 +142,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
pShowRsp->qhandle = htobe64((uint64_t) pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow);
mTrace("show:%p, type:%s, start to get meta", pShow, mgmtGetShowTypeStr(pShowMsg->type)); mTrace("show:%p, type:%s, start to get meta", pShow, mgmtGetShowTypeStr(pShowMsg->type));
int32_t code = (*tsMgmtShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle); int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle);
if (code == 0) { if (code == 0) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->thandle, .handle = pMsg->thandle,
...@@ -252,7 +162,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { ...@@ -252,7 +162,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
} }
} }
static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { static void mgmtProcessRetrieveMsg(SMnodeMsg *pMsg) {
int32_t rowsToRead = 0; int32_t rowsToRead = 0;
int32_t size = 0; int32_t size = 0;
int32_t rowsRead = 0; int32_t rowsRead = 0;
...@@ -291,7 +201,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { ...@@ -291,7 +201,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
// if free flag is set, client wants to clean the resources // if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*tsMgmtShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle); rowsRead = (*tsMnodeShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle);
if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS; if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS;
rpcFreeCont(pRsp); rpcFreeCont(pRsp);
...@@ -318,7 +228,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { ...@@ -318,7 +228,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
} }
} }
static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { static void mgmtProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
if (pHBRsp == NULL) { if (pHBRsp == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
...@@ -347,7 +257,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { ...@@ -347,7 +257,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { static void mgmtProcessConnectMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMConnectMsg *pConnectMsg = pMsg->pCont; SCMConnectMsg *pConnectMsg = pMsg->pCont;
...@@ -407,7 +317,7 @@ connect_over: ...@@ -407,7 +317,7 @@ connect_over:
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { static void mgmtProcessUseMsg(SMnodeMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMUseDbMsg *pUseDbMsg = pMsg->pCont; SCMUseDbMsg *pUseDbMsg = pMsg->pCont;
...@@ -426,7 +336,7 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { ...@@ -426,7 +336,7 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
/** /**
* check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one. * check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one.
*/ */
static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) { static bool mgmtCheckTableMetaMsgReadOnly(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont; SCMTableInfoMsg *pInfo = pMsg->pCont;
if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pInfo->tableId); if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pInfo->tableId);
if (pMsg->pTable != NULL) return true; if (pMsg->pTable != NULL) return true;
...@@ -441,7 +351,7 @@ static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) { ...@@ -441,7 +351,7 @@ static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) {
return true; return true;
} }
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg) { static bool mgmtCheckMsgReadOnly(SMnodeMsg *pMsg) {
if (pMsg->msgType == TSDB_MSG_TYPE_CM_TABLE_META) { if (pMsg->msgType == TSDB_MSG_TYPE_CM_TABLE_META) {
return mgmtCheckTableMetaMsgReadOnly(pMsg); return mgmtCheckTableMetaMsgReadOnly(pMsg);
} }
...@@ -514,23 +424,30 @@ void mgmtFreeQhandle(void *qhandle, bool forceRemove) { ...@@ -514,23 +424,30 @@ void mgmtFreeQhandle(void *qhandle, bool forceRemove) {
taosCacheRelease(tsQhandleCache, &qhandle, forceRemove); taosCacheRelease(tsQhandleCache, &qhandle, forceRemove);
} }
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg, SRspRet *pRet) {
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_INVALID_USER;
return NULL;
}
SMnodeMsg *pMsg = calloc(1, sizeof(SMnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL; return NULL;
} }
SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg));
pMsg->thandle = rpcMsg->handle; pMsg->thandle = rpcMsg->handle;
pMsg->msgType = rpcMsg->msgType; pMsg->msgType = rpcMsg->msgType;
pMsg->contLen = rpcMsg->contLen; pMsg->contLen = rpcMsg->contLen;
pMsg->pCont = rpcMsg->pCont; pMsg->pCont = rpcMsg->pCont;
pMsg->pUser = pUser; pMsg->pUser = pUser;
pMsg->pRet = pRet;
return pMsg; return pMsg;
} }
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { void mgmtFreeQueuedMsg(SMnodeMsg *pMsg) {
if (pMsg != NULL) { if (pMsg != NULL) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser); if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser);
...@@ -543,8 +460,8 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { ...@@ -543,8 +460,8 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
} }
} }
void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { void* mgmtCloneQueuedMsg(SMnodeMsg *pSrcMsg) {
SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg)); SMnodeMsg *pDestMsg = calloc(1, sizeof(SMnodeMsg));
pDestMsg->thandle = pSrcMsg->thandle; pDestMsg->thandle = pSrcMsg->thandle;
pDestMsg->msgType = pSrcMsg->msgType; pDestMsg->msgType = pSrcMsg->msgType;
......
...@@ -58,27 +58,27 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -58,27 +58,27 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessCreateTableMsg(SMnodeMsg *queueMsg);
static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateChildTableMsg(SMnodeMsg *pMsg);
static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessDropTableMsg(SMnodeMsg *queueMsg);
static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg); static void mgmtProcessDropSuperTableMsg(SMnodeMsg *pMsg);
static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropSuperTableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg); static void mgmtProcessDropChildTableMsg(SMnodeMsg *pMsg);
static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg);
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg); static void mgmtProcessSuperTableVgroupMsg(SMnodeMsg *queueMsg);
static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessMultiTableMetaMsg(SMnodeMsg *queueMsg);
static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg);
static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessTableMetaMsg(SMnodeMsg *queueMsg);
static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg); static void mgmtGetSuperTableMeta(SMnodeMsg *pMsg);
static void mgmtGetChildTableMeta(SQueuedMsg *pMsg); static void mgmtGetChildTableMeta(SMnodeMsg *pMsg);
static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg); static void mgmtAutoCreateChildTable(SMnodeMsg *pMsg);
static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessAlterTableMsg(SMnodeMsg *queueMsg);
static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName); static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName);
...@@ -559,10 +559,10 @@ int32_t mgmtInitTables() { ...@@ -559,10 +559,10 @@ int32_t mgmtInitTables() {
dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -655,7 +655,7 @@ static void mgmtExtractTableName(char* tableId, char* name) { ...@@ -655,7 +655,7 @@ static void mgmtExtractTableName(char* tableId, char* name) {
} }
} }
static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont; SCMCreateTableMsg *pCreate = pMsg->pCont;
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pCreate->db); if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pCreate->db);
...@@ -689,7 +689,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { ...@@ -689,7 +689,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
} }
} }
static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropTableMsg(SMnodeMsg *pMsg) {
SCMDropTableMsg *pDrop = pMsg->pCont; SCMDropTableMsg *pDrop = pMsg->pCont;
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId); if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
...@@ -726,7 +726,7 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { ...@@ -726,7 +726,7 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
} }
} }
static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { static void mgmtProcessTableMetaMsg(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont; SCMTableInfoMsg *pInfo = pMsg->pCont;
pInfo->createFlag = htons(pInfo->createFlag); pInfo->createFlag = htons(pInfo->createFlag);
mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->thandle, pInfo->createFlag); mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->thandle, pInfo->createFlag);
...@@ -755,7 +755,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { ...@@ -755,7 +755,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
} }
} }
static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont; SCMCreateTableMsg *pCreate = pMsg->pCont;
SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj)); SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj));
if (pStable == NULL) { if (pStable == NULL) {
...@@ -812,7 +812,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { ...@@ -812,7 +812,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
} }
} }
static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable;
if (pStable->numOfTables != 0) { if (pStable->numOfTables != 0) {
SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash); SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash);
...@@ -1239,7 +1239,7 @@ static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTa ...@@ -1239,7 +1239,7 @@ static int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTa
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
} }
static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { static void mgmtGetSuperTableMeta(SMnodeMsg *pMsg) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
pMeta->uid = htobe64(pTable->uid); pMeta->uid = htobe64(pTable->uid);
...@@ -1263,7 +1263,7 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { ...@@ -1263,7 +1263,7 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
mTrace("stable:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); mTrace("stable:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid);
} }
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { static void mgmtProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SCMSTableVgroupMsg *pInfo = pMsg->pCont; SCMSTableVgroupMsg *pInfo = pMsg->pCont;
int32_t numOfTable = htonl(pInfo->numOfTables); int32_t numOfTable = htonl(pInfo->numOfTables);
...@@ -1487,7 +1487,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj ...@@ -1487,7 +1487,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
return pTable; return pTable;
} }
static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont; SCMCreateTableMsg *pCreate = pMsg->pCont;
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1536,7 +1536,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { ...@@ -1536,7 +1536,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
} }
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pMsg->pTable; newMsg->ahandle = pMsg->pTable;
newMsg->maxRetry = 10; newMsg->maxRetry = 10;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -1550,7 +1550,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { ...@@ -1550,7 +1550,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
dnodeSendMsgToDnode(&ipSet, &rpcMsg); dnodeSendMsgToDnode(&ipSet, &rpcMsg);
} }
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropChildTableMsg(SMnodeMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); if (pMsg->pVgroup == NULL) pMsg->pVgroup = mgmtGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) { if (pMsg->pVgroup == NULL) {
...@@ -1575,7 +1575,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { ...@@ -1575,7 +1575,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup);
mPrint("table:%s, send drop ctable msg", pDrop->tableId); mPrint("table:%s, send drop ctable msg", pDrop->tableId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pMsg->pTable; newMsg->ahandle = pMsg->pTable;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = newMsg, .handle = newMsg,
...@@ -1695,7 +1695,7 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pT ...@@ -1695,7 +1695,7 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pT
return numOfCols * sizeof(SSchema); return numOfCols * sizeof(SSchema);
} }
static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { static int32_t mgmtDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
...@@ -1740,7 +1740,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -1740,7 +1740,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) { static void mgmtAutoCreateChildTable(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont; SCMTableInfoMsg *pInfo = pMsg->pCont;
STagData* pTag = (STagData*)pInfo->tags; STagData* pTag = (STagData*)pInfo->tags;
...@@ -1760,7 +1760,7 @@ static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) { ...@@ -1760,7 +1760,7 @@ static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) {
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg)); memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg));
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); SMnodeMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; newMsg->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
newMsg->pCont = pCreateMsg; newMsg->pCont = pCreateMsg;
...@@ -1768,7 +1768,7 @@ static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) { ...@@ -1768,7 +1768,7 @@ static void mgmtAutoCreateChildTable(SQueuedMsg *pMsg) {
mgmtAddToShellQueue(newMsg); mgmtAddToShellQueue(newMsg);
} }
static void mgmtGetChildTableMeta(SQueuedMsg *pMsg) { static void mgmtGetChildTableMeta(SMnodeMsg *pMsg) {
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
if (pMeta == NULL) { if (pMeta == NULL) {
mError("table:%s, failed to get table meta, no enough memory", pMsg->pTable->tableId); mError("table:%s, failed to get table meta, no enough memory", pMsg->pTable->tableId);
...@@ -1926,7 +1926,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { ...@@ -1926,7 +1926,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return; if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle; SMnodeMsg *queueMsg = rpcMsg->handle;
queueMsg->received++; queueMsg->received++;
SChildTableObj *pTable = queueMsg->ahandle; SChildTableObj *pTable = queueMsg->ahandle;
...@@ -1974,7 +1974,7 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -1974,7 +1974,7 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return; if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle; SMnodeMsg *queueMsg = rpcMsg->handle;
queueMsg->received++; queueMsg->received++;
SChildTableObj *pTable = queueMsg->ahandle; SChildTableObj *pTable = queueMsg->ahandle;
...@@ -2020,7 +2020,7 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { ...@@ -2020,7 +2020,7 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) {
mTrace("alter table rsp received, handle:%p code:%s", rpcMsg->handle, tstrerror(rpcMsg->code)); mTrace("alter table rsp received, handle:%p code:%s", rpcMsg->handle, tstrerror(rpcMsg->code));
} }
static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { static void mgmtProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
SCMMultiTableInfoMsg *pInfo = pMsg->pCont; SCMMultiTableInfoMsg *pInfo = pMsg->pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables); pInfo->numOfTables = htonl(pInfo->numOfTables);
...@@ -2207,7 +2207,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -2207,7 +2207,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
return numOfRows; return numOfRows;
} }
static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessAlterTableMsg(SMnodeMsg *pMsg) {
SCMAlterTableMsg *pAlter = pMsg->pCont; SCMAlterTableMsg *pAlter = pMsg->pCont;
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
......
...@@ -34,9 +34,9 @@ static void * tsUserSdb = NULL; ...@@ -34,9 +34,9 @@ static void * tsUserSdb = NULL;
static int32_t tsUserUpdateSize = 0; static int32_t tsUserUpdateSize = 0;
static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateUserMsg(SMnodeMsg *pMsg);
static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterUserMsg(SMnodeMsg *pMsg);
static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); static void mgmtProcessDropUserMsg(SMnodeMsg *pMsg);
static void mgmtProcessAuthMsg(SRpcMsg *rpcMsg); static void mgmtProcessAuthMsg(SRpcMsg *rpcMsg);
static int32_t mgmtUserActionDestroy(SSdbOper *pOper) { static int32_t mgmtUserActionDestroy(SSdbOper *pOper) {
...@@ -139,8 +139,8 @@ int32_t mgmtInitUsers() { ...@@ -139,8 +139,8 @@ int32_t mgmtInitUsers() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_USER, mgmtProcessCreateUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_USER, mgmtProcessCreateUserMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_USER, mgmtProcessAlterUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_USER, mgmtProcessAlterUserMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_USER, mgmtProcessDropUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_USER, mgmtProcessDropUserMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers);
dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_AUTH, mgmtProcessAuthMsg); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_AUTH, mgmtProcessAuthMsg);
mTrace("table:%s, hash is created", tableDesc.tableName); mTrace("table:%s, hash is created", tableDesc.tableName);
...@@ -344,7 +344,7 @@ SUserObj *mgmtGetUserFromConn(void *pConn) { ...@@ -344,7 +344,7 @@ SUserObj *mgmtGetUserFromConn(void *pConn) {
} }
} }
static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateUserMsg(SMnodeMsg *pMsg) {
int32_t code; int32_t code;
SUserObj *pOperUser = pMsg->pUser; SUserObj *pOperUser = pMsg->pUser;
...@@ -362,7 +362,7 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) { ...@@ -362,7 +362,7 @@ static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
} }
static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { static void mgmtProcessAlterUserMsg(SMnodeMsg *pMsg) {
int32_t code; int32_t code;
SUserObj *pOperUser = pMsg->pUser; SUserObj *pOperUser = pMsg->pUser;
...@@ -457,7 +457,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { ...@@ -457,7 +457,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
mgmtDecUserRef(pUser); mgmtDecUserRef(pUser);
} }
static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropUserMsg(SMnodeMsg *pMsg) {
int32_t code; int32_t code;
SUserObj *pOperUser = pMsg->pUser; SUserObj *pOperUser = pMsg->pUser;
......
...@@ -220,8 +220,8 @@ int32_t mgmtInitVgroups() { ...@@ -220,8 +220,8 @@ int32_t mgmtInitVgroups() {
return -1; return -1;
} }
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg);
...@@ -297,7 +297,7 @@ void *mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup) { ...@@ -297,7 +297,7 @@ void *mgmtGetNextVgroup(void *pIter, SVgObj **pVgroup) {
return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup); return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup);
} }
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { void mgmtCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
strcpy(pVgroup->dbName, pDb->name); strcpy(pVgroup->dbName, pDb->name);
pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->numOfVnodes = pDb->cfg.replications;
...@@ -617,7 +617,7 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { ...@@ -617,7 +617,7 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->handle == NULL) return; if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle; SMnodeMsg *queueMsg = rpcMsg->handle;
queueMsg->received++; queueMsg->received++;
if (rpcMsg->code == TSDB_CODE_SUCCESS) { if (rpcMsg->code == TSDB_CODE_SUCCESS) {
queueMsg->code = rpcMsg->code; queueMsg->code = rpcMsg->code;
...@@ -632,7 +632,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -632,7 +632,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (queueMsg->received != queueMsg->expected) return; if (queueMsg->received != queueMsg->expected) return;
if (queueMsg->received == queueMsg->successed) { if (queueMsg->received == queueMsg->successed) {
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); SMnodeMsg *newMsg = mgmtCloneQueuedMsg(queueMsg);
mgmtAddToShellQueue(newMsg); mgmtAddToShellQueue(newMsg);
} else { } else {
SSdbOper oper = { SSdbOper oper = {
...@@ -684,7 +684,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -684,7 +684,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
mTrace("drop vnode rsp is received, handle:%p", rpcMsg->handle); mTrace("drop vnode rsp is received, handle:%p", rpcMsg->handle);
if (rpcMsg->handle == NULL) return; if (rpcMsg->handle == NULL) return;
SQueuedMsg *queueMsg = rpcMsg->handle; SMnodeMsg *queueMsg = rpcMsg->handle;
queueMsg->received++; queueMsg->received++;
if (rpcMsg->code == TSDB_CODE_SUCCESS) { if (rpcMsg->code == TSDB_CODE_SUCCESS) {
queueMsg->code = rpcMsg->code; queueMsg->code = rpcMsg->code;
...@@ -708,7 +708,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -708,7 +708,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
code = TSDB_CODE_SDB_ERROR; code = TSDB_CODE_SDB_ERROR;
} }
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); SMnodeMsg *newMsg = mgmtCloneQueuedMsg(queueMsg);
mgmtAddToShellQueue(newMsg); mgmtAddToShellQueue(newMsg);
queueMsg->pCont = NULL; queueMsg->pCont = NULL;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "tbalance.h"
#include "tgrant.h"
#include "ttimer.h"
#include "tglobal.h"
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtInt.h"
#include "mgmtServer.h"
#include "mgmtAcct.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtDb.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
#include "mgmtUser.h"
#include "mgmtTable.h"
#include "mgmtShell.h"
static void (*tsMnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SMnodeMsg *);
void mnodeAddWriteMsgHandle(uint8_t msgType, void (*fp)(SMnodeMsg *pMsg)) {
tsMnodeProcessWriteMsgFp[msgType] = fp;
}
int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
SRpcMsg *rpcMsg = &pMsg->rpcMsg;
if (rpcMsg->pCont == NULL) {
mError("%p, msg:%s content is null", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_INVALID_MSG_LEN;
}
if (!sdbIsMaster()) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcIpSet *ipSet = rpcMallocCont(sizeof(SRpcIpSet));
mgmtGetMnodeIpSetForShell(ipSet);
rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet);
mTrace("%p, msg:%s will be redireced, inUse:%d", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
}
return TSDB_CODE_REDIRECT;
}
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mError("%p, msg:%s not processed, grant time expired", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_GRANT_EXPIRED;
}
if (tsMnodeProcessReadMsgFp[rpcMsg->msgType] == NULL) {
mError("%p, msg:%s not processed, no handle exist", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED;
}
if (!mnodeInitMsg(pMsg)) {
mError("%p, msg:%s not processed, reason:%s", rpcMsg->ahandle, taosMsg[rpcMsg->msgType], tstrerror(terrno));
return terrno;
}
if (!pMsg->pUser->writeAuth) {
mError("%p, msg:%s not processed, no rights", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
return TSDB_CODE_NO_RIGHTS;
}
return (*tsMnodeProcessWriteMsgFp[rpcMsg->msgType])(pMsg);
}
static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) {
mgmtAddToShellQueue(param);
}
void mgmtDealyedAddToShellQueue(SMnodeMsg *queuedMsg) {
void *unUsed = NULL;
taosTmrReset(mgmtDoDealyedAddToShellQueue, 300, queuedMsg, tsMgmtTmr, &unUsed);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册