提交 31265ce2 编写于 作者: S slguan

refact mnode

上级 0a912796
......@@ -15,5 +15,5 @@ ADD_SUBDIRECTORY(plugins)
ADD_SUBDIRECTORY(sdb)
ADD_SUBDIRECTORY(mnode)
# ADD_SUBDIRECTORY(vnode)
# ADD_SUBDIRECTORY(dnode)
ADD_SUBDIRECTORY(dnode)
#ADD_SUBDIRECTORY(connector/jdbc)
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
add_library(common ${SOURCE_LIST})
target_include_directories(
common
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc
PUBLIC ${CMAKE_SOURCE_DIR}/src/inc
PUBLIC ${CMAKE_SOURCE_DIR}/src/os/linux/inc
)
\ No newline at end of file
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(common ${SRC})
ENDIF ()
......@@ -47,8 +47,6 @@ extern void *tsMgmtTmr;
extern void *tsMgmtTranQhandle;
extern char tsMgmtDirectory[];
extern int tsDbUpdateSize;
typedef struct {
uint32_t privateIp;
int32_t sid;
......
......@@ -59,6 +59,9 @@ extern "C" {
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 19
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 20
......
......@@ -19,13 +19,11 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "mnode.h"
int32_t mgmtInitBalance();
void mgmtCleanupBalance();
int32_t mgmtAllocVnodes(SVgObj *pVgroup);
char* mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode);
#ifdef __cplusplus
}
......
......@@ -23,6 +23,7 @@ extern "C" {
int32_t mgmtInitDClient();
void mgmtCleanupDClient();
void mgmtAddDClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
......
......@@ -30,10 +30,7 @@ void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendDropTableMsg(SMDDropTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
//
//int32_t mgmtInitDnodeInt();
//void mgmtCleanUpDnodeInt();
......
......@@ -22,24 +22,16 @@ extern "C" {
#include "mnode.h"
void mgmtMonitorDbDrop(void *unused, void *unusedt);
int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter);
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpDbs();
int32_t mgmtInitDbs();
int32_t mgmtUpdateDb(SDbObj *pDb);
void mgmtCleanUpDbs();
SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByTableId(char *db);
int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate);
int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
int32_t mgmtDropDb(SDbObj *pDb);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
void mgmtAddSuperTableIntoDb(SDbObj *pDb);
......
......@@ -19,46 +19,22 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId);
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes);
int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtSendCfgDnodeMsg(char *cont);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode);
int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType);
int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtInitDnodes();
void mgmtCleanUpDnodes();
int32_t mgmtGetDnodesNum();
int32_t mgmtUpdateDnode(SDnodeObj *pDnode);
void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode);
bool mgmtCheckConfigShow(SGlobalConfig *cfg);
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode);
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode);
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
SDnodeObj* mgmtGetDnode(uint32_t ip);
extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip);
extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip);
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode);
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode);
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType);
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode);
void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode);
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId);
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes);
#ifdef __cplusplus
}
......
......@@ -19,9 +19,6 @@
#ifdef __cplusplus
"C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
bool mgmtCheckExpired();
......@@ -30,11 +27,6 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries);
int32_t mgmtCheckTimeSeries(uint32_t timeseries);
int32_t mgmtCheckUserGrant();
int32_t mgmtCheckDbGrant();
int32_t mgmtCheckDnodeGrant();
int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn);
extern void (*mgmtUpdateGrantInfoFp)(void *pCont);
#ifdef __cplusplus
}
......
......@@ -20,19 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
bool mgmtCheckRedirect(void *handle);
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp);
int32_t mgmtRemoveMnode(uint32_t privateIp);
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifdef __cplusplus
}
#endif
......
......@@ -19,27 +19,15 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "mnode.h"
int32_t mgmtInitProfile();
void mgmtCleanUpProfile();
bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle);
int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg);
int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtKillQuery(char *qidstr, void *pConn);
int32_t mgmtKillStream(char *qidstr, void *pConn);
int32_t mgmtKillConnection(char *qidstr, void *pConn);
enum {
TSDB_PROCESS_CREATE_VGROUP,
TSDB_PROCESS_CREATE_VGROUP_GET_META,
......
......@@ -30,23 +30,6 @@ typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, vo
void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
//extern int32_t (*mgmtCheckRedirect)(void *pConn);
//
///*
// * If table not exist, will create it
// */
//void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
//
///*
// * If vgroup not exist, will create vgroup
// */
//void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
//
///*
// * If vgroup create returned, will then create table
// */
//void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
#ifdef __cplusplus
}
#endif
......
......@@ -30,8 +30,6 @@ int32_t mgmtInitSuperTables();
void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId);
int32_t mgmtGetShowSuperTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
......
......@@ -26,7 +26,9 @@ extern "C" {
#include "mnode.h"
int32_t mgmtInitTables();
void mgmtCleanUpTables();
STableInfo* mgmtGetTable(char *tableId);
STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
......@@ -34,9 +36,6 @@ int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpMeters();
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
......@@ -45,6 +44,9 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
#ifdef __cplusplus
}
......
......@@ -24,6 +24,7 @@ extern "C" {
int32_t mgmtInitUsers();
void mgmtCleanUpUsers();
SUserObj *mgmtGetUser(char *name);
SUserObj *mgmtGetUserFromConn(void *pConn);
#ifdef __cplusplus
}
......
......@@ -33,9 +33,6 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb);
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
void mgmtUpdateVgroup(SVgObj *pVgroup);
int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtSetVgroupIdPool();
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
......@@ -43,6 +40,8 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
......
......@@ -14,24 +14,13 @@
*/
#define _DEFAULT_SOURCE
#include "tglobalcfg.h"
#include "tmodule.h"
#include "tstatus.h"
#include "ttime.h"
#include "mgmtBalance.h"
#include "mgmtDnode.h"
void (*mgmtStartBalanceTimerFp)(int64_t mseconds) = NULL;
int32_t (*mgmtInitBalanceFp)() = NULL;
void (*mgmtCleanupBalanceFp)() = NULL;
int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL;
char * (*mgmtGetVnodeStatusFp)(SVgObj *pVgroup, SVnodeGid *pVnode) = NULL;
void mgmtStartBalanceTimer(int64_t mseconds) {
if (mgmtStartBalanceTimerFp) {
(*mgmtStartBalanceTimerFp)(mseconds);
}
}
int32_t mgmtInitBalance() {
if (mgmtInitBalanceFp) {
......@@ -55,11 +44,11 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
SDnodeObj *pDnode = mgmtGetDnode(0);
if (pDnode == NULL) return TSDB_CODE_OTHERS;
int selectedVnode = -1;
int lastAllocVode = pDnode->lastAllocVnode;
int32_t selectedVnode = -1;
int32_t lastAllocVode = pDnode->lastAllocVnode;
for (int i = 0; i < pDnode->numOfVnodes; i++) {
int vnode = (i + lastAllocVode) % pDnode->numOfVnodes;
for (int32_t i = 0; i < pDnode->numOfVnodes; i++) {
int32_t vnode = (i + lastAllocVode) % pDnode->numOfVnodes;
if (pDnode->vload[vnode].vgId == 0 && pDnode->vload[vnode].status == TSDB_VN_STATUS_OFFLINE) {
selectedVnode = vnode;
break;
......
......@@ -28,6 +28,8 @@
#include "mgmtDb.h"
#include "mgmtGrant.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtDClient.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
......@@ -387,7 +389,16 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendDropTableMsg(pRemove, &ipSet, NULL);
mTrace("table:%s, send drop table msg", pRemove->tableId);
SRpcMsg rpcMsg = {
.handle = 0,
.pCont = pRemove,
.contLen = sizeof(SMDDropTableMsg),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
mError("table:%s, update ctables sdb error", pTable->tableId);
......
......@@ -70,6 +70,10 @@ void mgmtAddDClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
mgmtProcessDnodeRspFp[msgType] = fp;
}
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsMgmtDClientRpc, ipSet, rpcMsg);
}
static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) {
(*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg);
......@@ -80,17 +84,7 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
rpcFreeCont(rpcMsg->pCont);
}
//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessAlterStreamRsp(SRpcMsg *rpcMsg);
//static void mgmtProcessConfigDnodeRsp(SRpcMsg *rpcMsg);
//
//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
// mTrace("create table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
// if (rpcMsg->handle == NULL) return;
......@@ -173,50 +167,8 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
// mTrace("config dnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
// SRpcMsg rpcMsg = {
// .handle = ahandle,
// .pCont = pCreate,
// .contLen = htonl(pCreate->contLen),
// .code = 0,
// .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
// };
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
//}
//
//void mgmtSendDropTableMsg(SMDDropTableMsg *pDrop, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send drop table msg, ahandle:%p", pDrop->tableId, ahandle);
// SRpcMsg rpcMsg = {
// .handle = ahandle,
// .pCont = pDrop,
// .contLen = sizeof(SMDDropTableMsg),
// .code = 0,
// .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
// };
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
//}
//
//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
// SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
// SRpcMsg rpcMsg = {
// .handle = ahandle,
// .pCont = pCreate,
// .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
// .code = 0,
// .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
// };
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
//}
//
//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d, send create all vnodes msg, handle:%p", pVgroup->vgId, ahandle);
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
//
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
......@@ -235,13 +187,7 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
//}
//
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendDropVnodeMsg(pVgroup->vgId, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
////
////int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
//// char *option, *value;
......@@ -278,36 +224,3 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
//// return -1;
////}
////
////int32_t mgmtSendCfgDnodeMsg(char *cont) {
//// SDnodeObj *pDnode;
//// SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont;
//// uint32_t ip;
////
//// ip = inet_addr(pCfg->ip);
//// pDnode = mgmtGetDnode(ip);
//// if (pDnode == NULL) {
//// mError("dnode ip:%s not configured", pCfg->ip);
//// return TSDB_CODE_NOT_CONFIGURED;
//// }
////
//// mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config);
//// int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config);
//// if (code != -1) {
//// return code;
//// }
////
////#ifdef CLUSTER
//// pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_MD_CONFIG_DNODE);
//// if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
//// pMsg = pStart;
////
//// memcpy(pMsg, cont, sizeof(SCfgDnodeMsg));
//// pMsg += sizeof(SCfgDnodeMsg);
////
//// msgLen = pMsg - pStart;
//// mgmtSendMsgToDnode(pDnode, pStart, msgLen);
////#else
//// (void)tsCfgDynamicOptions(pCfg->config);
////#endif
//// return 0;
////}
......@@ -174,21 +174,12 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// free(info);
//}
//
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle);
//}
//
//static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
// mTrace("remove table rsp received, thandle:%p code:%d", thandle, code);
//}
//
//void mgmtSendDropTableMsg(SMDDropTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, sid:%d send remove table msg, ahandle:%p", pRemove->tableId, htonl(pRemove->sid), ahandle);
// if (pRemove != NULL) {
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_DROP_TABLE, pRemove, sizeof(SMDDropTableMsg), ahandle);
// }
//}
//
//static void mgmtProcessDropVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
// mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code);
......@@ -235,18 +226,6 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// }
//}
//
//static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) {
// if (mgmtUpdateGrantInfoFp) {
// mgmtUpdateGrantInfoFp(pCont);
// mTrace("grant info is updated");
// }
//
// SRpcMsg rpcMsg = {0};
// rpcMsg.code = TSDB_CODE_SUCCESS;
// rpcMsg.handle = thandle;
// rpcSendResponse(&rpcMsg);
//}
//
//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
// if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
// mError("invalid msg type:%d", msgType);
......@@ -273,9 +252,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
// } else if (msgType == TSDB_MSG_TYPE_STATUS) {
// mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code);
// } else if (msgType == TSDB_MSG_TYPE_GRANT) {
// mgmtProcessDnodeGrantMsg(pCont, pConn);
// } else {
// } else {
// mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
// }
//
......@@ -296,15 +273,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// }
//}
//
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
//
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
//
//int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
// char *option, *value;
// int32_t olen, valen;
......@@ -340,52 +309,6 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// return -1;
//}
//
//int32_t mgmtSendCfgDnodeMsg(char *cont) {
////#ifdef CLUSTER
//// char * pMsg, *pStart;
//// int32_t msgLen = 0;
////#endif
////
//// SDnodeObj *pDnode;
//// SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont;
//// uint32_t ip;
////
//// ip = inet_addr(pCfg->ip);
//// pDnode = mgmtGetDnode(ip);
//// if (pDnode == NULL) {
//// mError("dnode ip:%s not configured", pCfg->ip);
//// return TSDB_CODE_NOT_CONFIGURED;
//// }
////
//// mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config);
//// int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config);
//// if (code != -1) {
//// return code;
//// }
////
////#ifdef CLUSTER
//// pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_MD_CONFIG_DNODE);
//// if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
//// pMsg = pStart;
////
//// memcpy(pMsg, cont, sizeof(SCfgDnodeMsg));
//// pMsg += sizeof(SCfgDnodeMsg);
////
//// msgLen = pMsg - pStart;
//// mgmtSendMsgToDnode(pDnode, pStart, msgLen);
////#else
//// (void)tsCfgDynamicOptions(pCfg->config);
////#endif
// return 0;
//}
//
//int32_t mgmtInitDnodeInt() {
// if (mgmtInitDnodeIntFp) {
// return mgmtInitDnodeIntFp();
// } else {
// return 0;
// }
//}
//
//void mgmtCleanUpDnodeInt() {
// if (mgmtCleanUpDnodeIntFp) {
......
......@@ -18,31 +18,43 @@
#include "taoserror.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "tutil.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
extern void *tsVgroupSdb;
void *tsDbSdb = NULL;
int32_t tsDbUpdateSize;
void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
void mgmtDbActionInit() {
static void *tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mgmtUpdateDb(SDbObj *pDb);
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate);
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
static int32_t mgmtDropDb(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg);
static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg);
static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg);
static void *(*mgmtDbActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionReset(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void mgmtDbActionInit() {
mgmtDbActionFp[SDB_TYPE_INSERT] = mgmtDbActionInsert;
mgmtDbActionFp[SDB_TYPE_DELETE] = mgmtDbActionDelete;
mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate;
......@@ -52,7 +64,7 @@ void mgmtDbActionInit() {
mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy;
}
void *mgmtDbAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
static void *mgmtDbAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtDbActionFp[(uint8_t)action] != NULL) {
return (*(mgmtDbActionFp[(uint8_t)action]))(row, str, size, ssize);
}
......@@ -96,6 +108,12 @@ int32_t mgmtInitDbs() {
}
}
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CREATE_DB, mgmtProcessCreateDbMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_ALTER_DB, mgmtProcessAlterDbMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_DROP_DB, mgmtProcessDropDbMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DB, mgmtGetDbMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mgmtRetrieveDbs);
mTrace("db data is initialized");
return 0;
}
......@@ -115,7 +133,7 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
return (SDbObj *)sdbGetRow(tsDbSdb, db);
}
int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) {
static int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) {
if (pCreate->commitLog < 0 || pCreate->commitLog > 1) {
mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog);
return TSDB_CODE_INVALID_OPTION;
......@@ -188,7 +206,7 @@ int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate) {
return TSDB_CODE_SUCCESS;
}
int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) {
static int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) {
// assign default parameters
if (pCreate->maxSessions < 0) pCreate->maxSessions = tsSessionsPerVnode; //
if (pCreate->cacheBlockSize < 0) pCreate->cacheBlockSize = tsCacheBlockSize; //
......@@ -233,7 +251,13 @@ int32_t mgmtCheckDbParams(SCreateDbMsg *pCreate) {
return TSDB_CODE_SUCCESS;
}
int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) {
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) {
int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb);
if (numOfDbs >= tsMaxDbs) {
mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs);
return TSDB_CODE_TOO_MANY_DATABASES;
}
int32_t code = mgmtCheckDbLimit(pAcct);
if (code != 0) {
return code;
......@@ -269,11 +293,11 @@ int32_t mgmtCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate) {
return code;
}
int32_t mgmtUpdateDb(SDbObj *pDb) {
static int32_t mgmtUpdateDb(SDbObj *pDb) {
return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1);
}
int32_t mgmtSetDbDropping(SDbObj *pDb) {
static int32_t mgmtSetDbDropping(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0;
SVgObj *pVgroup = pDb->pHead;
......@@ -294,6 +318,16 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) {
}
}
}
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
//
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
//
// mgmtSendDropVgroupMsg(pVgroup);
pVgroup = pVgroup->next;
}
......@@ -310,7 +344,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) {
return 0;
}
bool mgmtCheckDropDbFinished(SDbObj *pDb) {
static bool mgmtCheckDropDbFinished(SDbObj *pDb) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) {
......@@ -333,7 +367,7 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) {
return true;
}
void mgmtDropDbFromSdb(SDbObj *pDb) {
static void mgmtDropDbFromSdb(SDbObj *pDb) {
while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead);
// SSuperTableObj *pMetric = pDb->pSTable;
......@@ -348,13 +382,13 @@ void mgmtDropDbFromSdb(SDbObj *pDb) {
mPrint("db:%s database drop finished", pDb->name);
}
int32_t mgmtDropDb(SDbObj *pDb) {
static int32_t mgmtDropDb(SDbObj *pDb) {
if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) {
bool finished = mgmtCheckDropDbFinished(pDb);
if (!finished) {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
mgmtSendDropVgroupMsg(pVgroup, NULL);
//mgmtSendDropVgroupMsg(pVgroup, NULL);
pVgroup = pVgroup->next;
}
return TSDB_CODE_ACTION_IN_PROGRESS;
......@@ -371,7 +405,7 @@ int32_t mgmtDropDb(SDbObj *pDb) {
}
}
int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name);
if (pDb == NULL) {
if (ignoreNotExists) return TSDB_CODE_SUCCESS;
......@@ -394,7 +428,8 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb));
}
void mgmtMonitorDbDrop(void *unused, void *unusedt) {
UNUSED_FUNC
static void mgmtMonitorDbDrop(void *unused, void *unusedt) {
void * pNode = NULL;
SDbObj *pDb = NULL;
......@@ -407,7 +442,7 @@ void mgmtMonitorDbDrop(void *unused, void *unusedt) {
}
}
int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
static int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
return 0;
// int32_t code = TSDB_CODE_SUCCESS;
//
......@@ -527,7 +562,7 @@ void mgmtCleanUpDbs() {
sdbCloseTable(tsDbSdb);
}
int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -676,7 +711,7 @@ char *mgmtGetDbStr(char *src) {
return ++pos;
}
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDbObj *pDb = NULL;
char * pWrite;
......@@ -868,3 +903,94 @@ void mgmtAddTableIntoDb(SDbObj *pDb) {
void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1);
}
static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SCreateDbMsg *pCreate = (SCreateDbMsg *) rpcMsg->pCont;
pCreate->maxSessions = htonl(pCreate->maxSessions);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->daysPerFile = htonl(pCreate->daysPerFile);
pCreate->daysToKeep = htonl(pCreate->daysToKeep);
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->blocksPerTable = htons(pCreate->blocksPerTable);
pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock);
// pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks);
if (mgmtCheckExpired()) {
rpcRsp.code = TSDB_CODE_GRANT_EXPIRED;
} else if (!pUser->writeAuth) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
rpcRsp.code = mgmtCreateDb(pUser->pAcct, pCreate);
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is created by %s", pCreate->db, pUser->user);
}
}
rpcSendResponse(&rpcRsp);
}
static void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SAlterDbMsg *pAlter = (SAlterDbMsg *) rpcMsg->pCont;
pAlter->daysPerFile = htonl(pAlter->daysPerFile);
pAlter->daysToKeep = htonl(pAlter->daysToKeep);
pAlter->maxSessions = htonl(pAlter->maxSessions) + 1;
if (!pUser->writeAuth) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
rpcRsp.code = mgmtAlterDb(pUser->pAcct, pAlter);
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is altered by %s", pAlter->db, pUser->user);
}
}
rpcSendResponse(&rpcRsp);
}
static void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return ;
}
if (pUser->superAuth) {
SDropDbMsg *pDrop = rpcMsg->pCont;
rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
}
} else {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
}
rpcSendResponse(&rpcRsp);
}
......@@ -18,9 +18,11 @@
#include "tmodule.h"
#include "tschemautil.h"
#include "tstatus.h"
#include "mnode.h"
#include "mgmtDnode.h"
#include "mgmtBalance.h"
#include "mgmtDnode.h"
#include "mgmtDClient.h"
#include "mgmtMnode.h"
#include "mgmtShell.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
......@@ -30,14 +32,18 @@ SDnodeObj *(*mgmtGetDnodeFp)(uint32_t ip) = NULL;
int32_t (*mgmtGetDnodesNumFp)() = NULL;
int32_t (*mgmtUpdateDnodeFp)(SDnodeObj *pDnode) = NULL;
void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL;
int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL;
int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL;
void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL;
int32_t (*mgmtCreateDnodeFp)(uint32_t ip) = NULL;
int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip) = NULL;
static SDnodeObj tsDnodeObj = {0};
static void * mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode);
static bool mgmtCheckConfigShow(SGlobalConfig *cfg);
static int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore;
......@@ -111,122 +117,6 @@ void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes) {
}
}
int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "IP");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "open vnodes");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "free vnodes");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 18;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "balance state");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mgmtGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
return 0;
}
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
int32_t cols = 0;
SDnodeObj *pDnode = NULL;
char *pWrite;
char ipstr[20];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextDnode(pShow, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
cols = 0;
tinet_ntoa(ipstr, pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pDnode->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->openVnodes;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->numOfFreeVnodes;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) );
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetDnodeLbStatusStr(pDnode->lbStatus));
cols++;
tinet_ntoa(ipstr, pDnode->publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
uint32_t status = pDnode->moduleStatus & (1 << moduleType);
......@@ -326,7 +216,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
return numOfRows;
}
int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
static int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
......@@ -367,7 +257,7 @@ int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
return 0;
}
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) {
......@@ -414,7 +304,7 @@ int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pCo
return numOfRows;
}
int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
......@@ -488,7 +378,7 @@ int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
return 0;
}
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
......@@ -538,6 +428,14 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
}
int32_t mgmtInitDnodes() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg);
if (mgmtInitDnodesFp) {
return mgmtInitDnodesFp();
} else {
......@@ -607,25 +505,6 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) {
return *pDnode;
}
int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
if (mgmtGetScoresMetaFp) {
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
return mgmtGetScoresMetaFp(pMeta, pShow, pConn);
} else {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
}
int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
if (mgmtRetrieveScoresFp) {
return mgmtRetrieveScoresFp(pShow, data, rows, pConn);
} else {
return 0;
}
}
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode) {
if (mgmtSetDnodeUnRemoveFp) {
mgmtSetDnodeUnRemoveFp(pDnode);
......@@ -640,16 +519,49 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg) {
return true;
}
/**
* check if a dnode in remove state
**/
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode) {
return pDnode->lbStatus == TSDB_DN_LB_STATUS_OFFLINE_REMOVING || pDnode->lbStatus == TSDB_DN_LB_STATE_SHELL_REMOVING;
}
/**
* check if a dnode in offline state
**/
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) {
return pDnode->status == TSDB_DN_STATUS_OFFLINE;
}
void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SCMCfgDnodeMsg *pCmCfgDnode = (SCMCfgDnodeMsg *) rpcMsg->pCont;
uint32_t dnodeIp = inet_addr(pCmCfgDnode->ip);
if (strcmp(pUser->pAcct->user, "root") != 0) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(dnodeIp);
SMDCfgDnodeMsg *pMdCfgDnode = rpcMallocCont(sizeof(SMDCfgDnodeMsg));
strcpy(pMdCfgDnode->ip, pCmCfgDnode->ip);
strcpy(pMdCfgDnode->config, pCmCfgDnode->config);
SRpcMsg rpcMdCfgDnodeMsg = {
.handle = 0,
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE,
.pCont = pMdCfgDnode,
.contLen = sizeof(SMDCfgDnodeMsg)
};
mgmtSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg);
rpcRsp.code = TSDB_CODE_SUCCESS;
}
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mTrace("dnode:%s is configured by %s", pCmCfgDnode->ip, pUser->user);
}
rpcSendResponse(&rpcRsp);
}
......@@ -15,21 +15,14 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtUser.h"
int32_t (*mgmtCheckUserGrantFp)() = NULL;
int32_t (*mgmtCheckDbGrantFp)() = NULL;
int32_t (*mgmtCheckDnodeGrantFp)() = NULL;
void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL;
bool (*mgmtCheckExpiredFp)() = NULL;
int32_t (*mgmtGetGrantsMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL;
int32_t (*mgmtRetrieveGrantsFp)(SShowObj *pShow, char *data, int rows, void *pConn) = NULL;
void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL;
int32_t mgmtCheckUserGrant() {
if (mgmtCheckUserGrantFp) {
......@@ -47,14 +40,6 @@ int32_t mgmtCheckDbGrant() {
}
}
int32_t mgmtCheckDnodeGrant() {
if (mgmtCheckDnodeGrantFp) {
return (*mgmtCheckDnodeGrantFp)();
} else {
return 0;
}
}
void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries += timeSeriesNum;
if (mgmtAddTimeSeriesFp) {
......@@ -84,22 +69,3 @@ bool mgmtCheckExpired() {
return false;
}
}
int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
if (mgmtGetGrantsMetaFp) {
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
return mgmtGetGrantsMetaFp(pMeta, pShow, pConn);
} else {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
}
int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
if (mgmtRetrieveGrantsFp) {
return mgmtRetrieveGrantsFp(pShow, data, rows, pConn);
} else {
return 0;
}
}
......@@ -14,151 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "trpc.h"
#include "tschemautil.h"
#include "mgmtMnode.h"
#include "mgmtUser.h"
int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL;
int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL;
int32_t (*mgmtGetMnodesNumFp)() = NULL;
void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL;
bool mgmtCheckRedirect(void *handle) {
return false;
}
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp) {
if (mgmtAddMnodeFp) {
return (*mgmtAddMnodeFp)(privateIp, publicIp);
} else {
return 0;
}
}
int32_t mgmtRemoveMnode(uint32_t privateIp) {
if (mgmtRemoveMnodeFp) {
return (*mgmtRemoveMnodeFp)(privateIp);
} else {
return 0;
}
}
static int32_t mgmtGetMnodesNum() {
if (mgmtGetMnodesNumFp) {
return (*mgmtGetMnodesNumFp)();
} else {
return 1;
}
}
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
if (mgmtGetNextMnodeFp) {
return (*mgmtGetNextMnodeFp)(pShow, pMnode);
} else {
if (*pMnode == NULL) {
*pMnode = NULL;
} else {
*pMnode = NULL;
}
}
return *pMnode;
}
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "IP");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
return 0;
}
int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
int32_t cols = 0;
SSdbPeer *pMnode = NULL;
char *pWrite;
char ipstr[20];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode);
if (pMnode == NULL) break;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pMnode->ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pMnode->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbStatusStr[(uint8_t)pMnode->status]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbRoleStr[(uint8_t)pMnode->role]);
cols++;
tinet_ntoa(ipstr, pMnode->publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
return true;
}
\ No newline at end of file
......@@ -24,6 +24,7 @@
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtGrant.h"
#include "mgmtNormalTable.h"
#include "mgmtSuperTable.h"
......@@ -422,7 +423,15 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendDropTableMsg(pRemove, &ipSet, NULL);
mTrace("table:%s, send drop table msg", pRemove->tableId);
SRpcMsg rpcMsg = {
.handle = 0,
.pCont = pRemove,
.contLen = sizeof(SMDDropTableMsg),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
mError("table:%s, update ntables sdb error", pTable->tableId);
......
......@@ -17,7 +17,16 @@
#include "os.h"
#include "taosmsg.h"
#include "tschemautil.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtUser.h"
int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg);
int32_t mgmtKillQuery(char *qidstr, void *pConn);
int32_t mgmtKillStream(char *qidstr, void *pConn);
int32_t mgmtKillConnection(char *qidstr, void *pConn);
typedef struct {
char user[TSDB_TABLE_ID_LEN + 1];
......@@ -663,3 +672,92 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
pShow->numOfReads += numOfRows;
return numOfRows;
}
void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SKillQueryMsg *pKill = (SKillQueryMsg *) rpcMsg->pCont;
int32_t code;
if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtKillQuery(pKill->queryId, rpcMsg->handle);
}
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SKillStreamMsg *pKill = (SKillStreamMsg *) rpcMsg->pCont;
int32_t code;
if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtKillStream(pKill->queryId, rpcMsg->handle);
}
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SKillConnectionMsg *pKill = (SKillConnectionMsg *) rpcMsg->pCont;
int32_t code;
if (!pUser->writeAuth) {
code = TSDB_CODE_NO_RIGHTS;
} else {
code = mgmtKillConnection(pKill->queryId, rpcMsg->handle);
}
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
int32_t mgmtInitProfile() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mgmtGetQueryMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mgmtRetrieveQueries);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONNS, mgmtGetConnsMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONNS, mgmtRetrieveConns);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mgmtGetStreamMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mgmtRetrieveStreams);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_KILL_QUERY, mgmtProcessKillQueryMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_KILL_STREAM, mgmtProcessKillStreamMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_KILL_CONNECTION, mgmtProcessKillConnectionMsg);
return 0;
}
void mgmtCleanUpProfile() {
}
此差异已折叠。
......@@ -29,6 +29,7 @@
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
......@@ -45,6 +46,8 @@ static void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int3
static void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetShowSuperTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static void mgmtDestroySuperTable(SSuperTableObj *pTable) {
free(pTable->schema);
......@@ -186,6 +189,9 @@ int32_t mgmtInitSuperTables() {
mgmtAddSuperTableIntoDb(pDb);
}
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables);
mTrace("stables is initialized");
return 0;
}
......@@ -477,7 +483,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
return TSDB_CODE_SUCCESS;
}
int32_t mgmtGetShowSuperTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
static int32_t mgmtGetShowSuperTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) {
return TSDB_CODE_DB_NOT_SELECTED;
......
......@@ -22,7 +22,9 @@
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "mgmtVgroup.h"
#include "mgmtUser.h"
#include "mgmtSystem.h"
......@@ -39,9 +41,10 @@ void mgmtCleanUpSystem() {
sdbCleanUpPeers();
mgmtCleanupBalance();
mgmtCleanUpDnodeInt();
mgmtCleanupDClient();
mgmtCleanupDServer();
mgmtCleanUpShell();
mgmtCleanUpMeters();
mgmtCleanUpTables();
mgmtCleanUpVgroups();
mgmtCleanUpDbs();
mgmtCleanUpDnodes();
......@@ -114,15 +117,18 @@ int32_t mgmtStartSystem() {
return -1;
}
if (mgmtInitDnodeInt() < 0) {
mError("failed to init inter-mgmt communication");
if (mgmtInitDClient() < 0) {
return -1;
}
// if (mgmtInitShell() < 0) {
// mError("failed to init shell");
// return -1;
// }
if (mgmtInitDServer() < 0) {
return -1;
}
if (mgmtInitShell() < 0) {
mError("failed to init shell");
return -1;
}
if (sdbInitPeers(tsMgmtDirectory) < 0) {
mError("failed to init peers");
......
......@@ -29,8 +29,10 @@
#include "mgmtAcct.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
......@@ -42,6 +44,15 @@
extern void *tsNormalTableSdb;
extern void *tsChildTableSdb;
static void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg);
static void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg);
static void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg);
static void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg);
static void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg);
static void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg);
static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtInitTables() {
int32_t code = mgmtInitSuperTables();
if (code != TSDB_CODE_SUCCESS) {
......@@ -60,6 +71,15 @@ int32_t mgmtInitTables() {
mgmtSetVgroupIdPool();
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CREATE_TABLE, mgmtProcessCreateTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_DROP_TABLE, mgmtProcessDropTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_ALTER_TABLE, mgmtProcessAlterTableMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_TABLE_META, mgmtProcessTableMetaMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_MULTI_TABLE_META, mgmtProcessMultiTableMetaMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_STABLE_META, mgmtProcessSuperTableMetaMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables);
return TSDB_CODE_SUCCESS;
}
......@@ -111,6 +131,114 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
return TSDB_CODE_SUCCESS;
}
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
mError("table:%s, failed to create vgroup, db not found", pCreate->tableId);
rpcRsp.code = TSDB_CODE_INVALID_DB;
rpcSendResponse(&rpcRsp);
return;
}
SVgObj *pVgroup = mgmtCreateVgroup(pDb);
if (pVgroup == NULL) {
mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId);
rpcRsp.code = TSDB_CODE_NO_ENOUGH_DNODES;
rpcSendResponse(&rpcRsp);
return;
}
void *cont = rpcMallocCont(contLen);
if (cont == NULL) {
mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
memcpy(cont, pCreate, contLen);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_VGROUP;
info->thandle = thandle;
info->ahandle = pVgroup;
info->cont = cont;
info->contLen = contLen;
if (isGetMeta) {
info->type = TSDB_PROCESS_CREATE_VGROUP_GET_META;
}
mgmtSendCreateVgroupMsg(pVgroup, info);
}
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
// SRpcMsg rpcMsg = {
// .handle = ahandle,
// .pCont = pCreate,
// .contLen = htonl(pCreate->contLen),
// .code = 0,
// .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
// };
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
//}
//
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
assert(pVgroup != NULL);
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId);
mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta);
return;
}
STableInfo *pTable;
SDMCreateTableMsg *pDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
rpcRsp.code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
} else {
mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
rpcRsp.code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
}
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid);
rpcSendResponse(&rpcRsp);
return;
}
assert(pDCreate != NULL);
assert(pTable != NULL);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->thandle = thandle;
info->ahandle = pTable;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
if (isGetMeta) {
info->type = TSDB_PROCESS_CREATE_TABLE_GET_META;
}
SRpcMsg rpcMsg = {
.handle = info,
.pCont = pCreate,
.contLen = htonl(pDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
}
int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
......@@ -246,7 +374,7 @@ int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
void mgmtCleanUpMeters() {
void mgmtCleanUpTables() {
mgmtCleanUpNormalTables();
mgmtCleanUpChildTables();
mgmtCleanUpSuperTables();
......@@ -422,3 +550,295 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) {
pTable->dirty = isDirty;
}
void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCreateTableMsg *pCreate = (SCreateTableMsg *) rpcMsg->pCont;
pCreate->numOfColumns = htons(pCreate->numOfColumns);
pCreate->numOfTags = htons(pCreate->numOfTags);
pCreate->sqlLen = htons(pCreate->sqlLen);
SSchema *pSchema = (SSchema*) pCreate->schema;
for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = i;
pSchema++;
}
if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table, need redirect message", pCreate->tableId);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("table:%s, failed to create table, invalid user", pCreate->tableId);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (!pUser->writeAuth) {
mError("table:%s, failed to create table, no rights", pCreate->tableId);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
int32_t code = mgmtCreateTable(pCreate, rpcMsg->contLen, rpcMsg->handle, false);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
}
void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDropTableMsg *pDrop = (SDropTableMsg *) rpcMsg->pCont;
if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to drop table, need redirect message", pDrop->tableId);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("table:%s, failed to drop table, invalid user", pDrop->tableId);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
if (!pUser->writeAuth) {
mError("table:%s, failed to drop table, no rights", pDrop->tableId);
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
rpcSendResponse(&rpcRsp);
return;
}
SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId);
if (pDb == NULL) {
mError("table:%s, failed to drop table, db not selected", pDrop->tableId);
rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED;
rpcSendResponse(&rpcRsp);
return;
}
int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
rpcRsp.code = code;
rpcSendResponse(&rpcRsp);
}
}
void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
return;
}
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SAlterTableMsg *pAlter = (SAlterTableMsg *) rpcMsg->pCont;
if (!pUser->writeAuth) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
pAlter->type = htons(pAlter->type);
pAlter->numOfCols = htons(pAlter->numOfCols);
if (pAlter->numOfCols > 2) {
mError("table:%s error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
rpcRsp.code = TSDB_CODE_APP_ERROR;
} else {
SDbObj *pDb = mgmtGetDb(pAlter->db);
if (pDb) {
for (int32_t i = 0; i < pAlter->numOfCols; ++i) {
pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes);
}
rpcRsp.code = mgmtAlterTable(pDb, pAlter);
if (rpcRsp.code == 0) {
mLPrint("table:%s is altered by %s", pAlter->tableId, pUser->user);
}
} else {
rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED;
}
}
}
rpcSendResponse(&rpcRsp);
}
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) {
SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDbObj* pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to get table meta, db not selected", pTable->tableId);
rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED;
rpcSendResponse(&rpcRsp);
return;
}
SRpcConnInfo connInfo;
rpcGetConnInfo(thandle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
rpcRsp.code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMeta);
} else {
pMeta->contLen = htons(pMeta->contLen);
rpcRsp.pCont = pMeta;
rpcRsp.contLen = pMeta->contLen;
}
rpcSendResponse(&rpcRsp);
}
void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp;
rpcRsp.handle = rpcMsg->handle;
rpcRsp.pCont = NULL;
rpcRsp.contLen = 0;
STableInfoMsg *pInfo = rpcMsg->pCont;
pInfo->createFlag = htons(pInfo->createFlag);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
if (pUser == NULL) {
mError("table:%s, failed to get table meta, invalid user", pInfo->tableId);
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
STableInfo *pTable = mgmtGetTable(pInfo->tableId);
if (pTable == NULL) {
if (pInfo->createFlag != 1) {
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
rpcRsp.code = TSDB_CODE_INVALID_TABLE;
rpcSendResponse(&rpcRsp);
return;
} else {
// on demand create table from super table if table does not exists
if (mgmtCheckRedirect(rpcMsg->handle) != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId);
return;
}
int32_t contLen = sizeof(SCreateTableMsg) + sizeof(STagData);
SCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
if (pCreateMsg == NULL) {
mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId);
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
strcpy(pCreateMsg->tableId, pInfo->tableId);
mError("table:%s, start to create table while get meta info", pInfo->tableId);
mgmtCreateTable(pCreateMsg, contLen, rpcMsg->handle, true);
}
} else {
mgmtProcessGetTableMeta(pTable, rpcMsg->handle);
}
}
void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp;
rpcRsp.handle = rpcMsg->handle;
rpcRsp.pCont = NULL;
rpcRsp.contLen = 0;
SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp);
return;
}
SMultiTableInfoMsg *pInfo = rpcMsg->pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables);
int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
if (pMultiMeta == NULL) {
rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(&rpcRsp);
return;
}
pMultiMeta->contLen = sizeof(SMultiTableMeta);
pMultiMeta->numOfTables = 0;
for (int t = 0; t < pInfo->numOfTables; ++t) {
char *tableId = (char*)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN);
STableInfo *pTable = mgmtGetTable(tableId);
if (pTable == NULL) continue;
SDbObj *pDb = mgmtGetDbByTableId(tableId);
if (pDb == NULL) continue;
int availLen = totalMallocLen - pMultiMeta->contLen;
if (availLen <= sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS) {
//TODO realloc
//totalMallocLen *= 2;
//pMultiMeta = rpcReMalloc(pMultiMeta, totalMallocLen);
//if (pMultiMeta == NULL) {
/// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
//} else {
// t--;
// continue;
//}
}
STableMeta *pMeta = (STableMeta *)(pMultiMeta->metas + pMultiMeta->contLen);
int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
if (code == TSDB_CODE_SUCCESS) {
pMultiMeta->numOfTables ++;
pMultiMeta->contLen += pMeta->contLen;
}
}
rpcRsp.pCont = pMultiMeta;
rpcRsp.contLen = pMultiMeta->contLen;
rpcSendResponse(&rpcRsp);
}
void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SSuperTableInfoMsg *pInfo = rpcMsg->pCont;
STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId);
if (pTable == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_TABLE;
rpcSendResponse(&rpcRsp);
return;
}
SSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable);
if (pRsp != NULL) {
int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t);
rpcRsp.pCont = pRsp;
rpcRsp.contLen = msgLen;
rpcSendResponse(&rpcRsp);
} else {
rpcRsp.code = TSDB_CODE_INVALID_TABLE;
rpcSendResponse(&rpcRsp);
}
}
\ No newline at end of file
......@@ -32,7 +32,6 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name);
static int32_t mgmtUpdateUser(SUserObj *pUser);
static int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static SUserObj *mgmtGetUserFromConn(void *pConn);
static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg);
static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg);
......@@ -329,7 +328,7 @@ static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *
return NULL;
}
static SUserObj *mgmtGetUserFromConn(void *pConn) {
SUserObj *mgmtGetUserFromConn(void *pConn) {
SRpcConnInfo connInfo;
rpcGetConnInfo(pConn, &connInfo);
......
......@@ -22,7 +22,9 @@
#include "mnode.h"
#include "mgmtBalance.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
......@@ -38,6 +40,9 @@ static void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t
static void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtVgroupActionInit() {
SVgObj tObj;
tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
......@@ -107,6 +112,9 @@ int32_t mgmtInitVgroups() {
mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
}
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
mTrace("vgroup is initialized");
return 0;
}
......@@ -192,7 +200,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
mgmtSendDropVgroupMsg(pVgroup, NULL);
//mgmtSendDropVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup);
......@@ -560,4 +568,25 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
SRpcIpSet ipSet = {.ip[0] = ip, .numOfIps = 1, .inUse = 0, .port = tsMgmtDnodePort + 1};
return ipSet;
}
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
SRpcMsg rpcMsg = {
.handle = ahandle,
.pCont = pCreate,
.contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
};
mgmtSendMsgToDnode(ipSet, &rpcMsg);
}
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace("vgroup:%d, send create all vnodes msg, handle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册