diff --git a/src/dnode/inc/dnodeSystem.h b/src/dnode/inc/dnodeSystem.h index 15dcdbb39dcf80e2d9b312dea55050711e18cb39..7aeb26b54ff9afd7390a12d31dc4307eb302bceb 100644 --- a/src/dnode/inc/dnodeSystem.h +++ b/src/dnode/inc/dnodeSystem.h @@ -33,11 +33,11 @@ extern int32_t (*dnodeInitPeers)(int32_t numOfThreads); extern int32_t (*dnodeCheckSystem)(); extern int32_t (*dnodeInitStorage)(); extern void (*dnodeCleanupStorage)(); -extern void (*dnodeParseParameterK)(); extern int32_t tsMaxQueues; extern void ** tsRpcQhandle; extern void *tsQueryQhandle; extern void *tsDnodeMgmtQhandle; +extern void *tsDnodeTmr; int32_t dnodeInitSystem(); void dnodeCleanUpSystem(); diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h index 504439fc7ea6da0957860c059bc33e90949cdee5..a60d74425b673a6a6fb426f04d57627a1b68c484 100644 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ b/src/dnode/inc/dnodeVnodeMgmt.h @@ -58,6 +58,8 @@ int32_t dnodeDropVnode(int32_t vnode); //tsdb_repo_t* dnodeGetVnode(int vid); void* dnodeGetVnode(int32_t vnode); +int32_t dnodeGetVnodesNum(); + /* * get the status of vnode */ diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index e1e7df07af2cb15dcd37d12e1ab448c6cc9e6851..d8273738c37ca20b738c0afdc635630ec153a38c 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -29,10 +29,14 @@ void (*dnodeInitMgmtIpFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL; +void (*dnodeCleanUpMgmtFp)() = NULL; + void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; + +static void *tsStatusTimer = NULL; static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); @@ -86,12 +90,69 @@ void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, } } +void dnodeSendStatusMsgToMgmt(void *handle, void *tmrId) { + taosTmrReset(dnodeSendStatusMsgToMgmt, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + if (tsStatusTimer == NULL) { + dError("Failed to start status timer"); + return; + } + + int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); + SStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + dError("Failed to malloc status message"); + return; + } + + int32_t totalVnodes = dnodeGetVnodesNum(); + + pStatus->version = htonl(tsVersion); + pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); + pStatus->publicIp = htonl(inet_addr(tsPublicIp)); + pStatus->lastReboot = htonl(tsRebootTime); + pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); + pStatus->openVnodes = htons((uint16_t) totalVnodes); + pStatus->numOfCores = htons((uint16_t) tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + pStatus->alternativeRole = (uint8_t) tsAlternativeRole; + + SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load; + + //TODO loop all vnodes +// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) { +// if (vnodeList[vnode].cfg.maxSessions <= 0) continue; +// +// SVnodeObj *pVnode = vnodeList + vnode; +// pLoad->vnode = htonl(vnode); +// pLoad->vgId = htonl(pVnode->cfg.vgId); +// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus; +// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus; +// pLoad->accessState = (uint8_t)(pVnode->accessState); +// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage); +// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage); +// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) { +// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten); +// } else { +// pLoad->pointsWritten = htobe64(0); +// } +// pLoad++; +// +// if (++count >= tsOpenVnodes) { +// break; +// } +// } + + dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen); +} + + int32_t dnodeInitMgmt() { if (dnodeInitMgmtFp) { dnodeInitMgmtFp(); } dnodeInitProcessShellMsg(); + taosTmrReset(dnodeSendStatusMsgToMgmt, 500, NULL, tsDnodeTmr, &tsStatusTimer); return 0; } @@ -101,6 +162,17 @@ void dnodeInitMgmtIp() { } } +void dnodeCleanUpMgmt() { + if (tsStatusTimer != NULL) { + taosTmrStopA(&tsStatusTimer); + tsStatusTimer = NULL; + } + + if (dnodeCleanUpMgmtFp) { + dnodeCleanUpMgmtFp(); + } +} + void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); diff --git a/src/dnode/src/dnodeService.c b/src/dnode/src/dnodeService.c index d7f9721fda9091efeb06d72f9fad2f76b4f65ebc..a8a110ab4cff681d79fb68782b6ab782bb3820c2 100644 --- a/src/dnode/src/dnodeService.c +++ b/src/dnode/src/dnodeService.c @@ -19,6 +19,8 @@ #include "tglobalcfg.h" #include "dnodeSystem.h" +void (*dnodeParseParameterKFp)() = NULL; + /* * Termination handler */ @@ -63,7 +65,9 @@ int main(int argc, char *argv[]) { printf("buildinfo: %s\n", buildinfo); return 0; } else if (strcmp(argv[i], "-k") == 0) { - dnodeParseParameterK(); + if (dnodeParseParameterKFp) { + dnodeParseParameterKFp(); + } #ifdef TAOS_MEM_CHECK } else if (strcmp(argv[i], "--alloc-random-fail") == 0) { if ((i < argc - 1) && (argv[i+1][0] != '-')) { diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 71e8f47e1249ba71425765f9184e13514cba753b..67a1d42565dfac1316833208ee1c0e89bddc5293 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -33,12 +33,12 @@ #include "dnodeVnodeMgmt.h" #ifdef CLUSTER -#include "acct.h" -#include "admin.h" -#include "cluster.h" -#include "grant.h" -#include "replica.h" -#include "storage.h" +//#include "acct.h" +//#include "admin.h" +//#include "cluster.h" +//#include "grant.h" +//#include "replica.h" +//#include "storage.h" #endif static pthread_mutex_t tsDnodeMutex; @@ -48,8 +48,7 @@ static int32_t dnodeInitRpcQHandle(); static int32_t dnodeInitQueryQHandle(); static int32_t dnodeInitTmrCtl(); -void *tsStatusTimer = NULL; -void *vnodeTmrCtrl; +void *tsDnodeTmr; void **tsRpcQhandle; void *tsDnodeMgmtQhandle; void *tsQueryQhandle; @@ -90,10 +89,7 @@ void dnodeCleanUpSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); } - if (tsStatusTimer != NULL) { - taosTmrStopA(&tsStatusTimer); - tsStatusTimer = NULL; - } + dnodeCleanupShell(); dnodeCleanUpModules(); @@ -259,15 +255,15 @@ static int32_t dnodeInitQueryQHandle() { int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode; dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads); - tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); + tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", tsDnodeTmr); return 0; } static int32_t dnodeInitTmrCtl() { - vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, + tsDnodeTmr = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode"); - if (vnodeTmrCtrl == NULL) { + if (tsDnodeTmr == NULL) { dError("failed to init timer, exit"); return -1; } @@ -298,10 +294,6 @@ int32_t dnodeCheckSystemImp() { int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp; -void dnodeParseParameterKImp() {} - -void (*dnodeParseParameterK)() = dnodeParseParameterKImp; - int32_t dnodeInitPeersImp(int32_t numOfThreads) { return 0; } diff --git a/src/dnode/src/dnodeVnodeMgmt.c b/src/dnode/src/dnodeVnodeMgmt.c index cf43f87aaae1882f52bac362bdad294d3d475f2e..fd1a0b6f2857799813448e8bd73e0b3faa50aa3f 100644 --- a/src/dnode/src/dnodeVnodeMgmt.c +++ b/src/dnode/src/dnodeVnodeMgmt.c @@ -59,3 +59,6 @@ bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { return true; } +int32_t dnodeGetVnodesNum() { + return 1; +} diff --git a/src/inc/.fuse_hidden0000e2ae00000244 b/src/inc/.fuse_hidden0000e2ae00000244 deleted file mode 100644 index 6c704221906284c996e6a0fda354bd26c9d5655c..0000000000000000000000000000000000000000 --- a/src/inc/.fuse_hidden0000e2ae00000244 +++ /dev/null @@ -1,816 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_TAOSMSG_H -#define TDENGINE_TAOSMSG_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include - -#include "taosdef.h" -#include "taoserror.h" -#include "taosdef.h" -#include "trpc.h" - -// message type -#define TSDB_MSG_TYPE_REG 1 -#define TSDB_MSG_TYPE_REG_RSP 2 -#define TSDB_MSG_TYPE_DNODE_SUBMIT 3 -#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 -#define TSDB_MSG_TYPE_DNODE_QUERY 5 -#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 -#define TSDB_MSG_TYPE_RETRIEVE 7 -#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 -#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9 -#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 -#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 -#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12 -#define TSDB_MSG_TYPE_DNODE_CREATE_VNODE 13 -#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14 -#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15 -#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16 -#define TSDB_MSG_TYPE_DNODE_CFG 17 -#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18 -#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19 -#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 20 -#define TSDB_MSG_TYPE_SDB_SYNC 21 -#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22 -#define TSDB_MSG_TYPE_SDB_FORWARD 23 -#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24 -#define TSDB_MSG_TYPE_CONNECT 31 -#define TSDB_MSG_TYPE_CONNECT_RSP 32 -#define TSDB_MSG_TYPE_CREATE_ACCT 33 -#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34 -#define TSDB_MSG_TYPE_ALTER_ACCT 35 -#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36 -#define TSDB_MSG_TYPE_DROP_ACCT 37 -#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38 -#define TSDB_MSG_TYPE_CREATE_USER 39 -#define TSDB_MSG_TYPE_CREATE_USER_RSP 40 -#define TSDB_MSG_TYPE_ALTER_USER 41 -#define TSDB_MSG_TYPE_ALTER_USER_RSP 42 -#define TSDB_MSG_TYPE_DROP_USER 43 -#define TSDB_MSG_TYPE_DROP_USER_RSP 44 -#define TSDB_MSG_TYPE_CREATE_MNODE 45 -#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46 -#define TSDB_MSG_TYPE_DROP_MNODE 47 -#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48 -#define TSDB_MSG_TYPE_CREATE_DNODE 49 -#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50 -#define TSDB_MSG_TYPE_DROP_DNODE 51 -#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52 -#define TSDB_MSG_TYPE_ALTER_DNODE 53 -#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54 -#define TSDB_MSG_TYPE_CREATE_DB 55 -#define TSDB_MSG_TYPE_CREATE_DB_RSP 56 -#define TSDB_MSG_TYPE_DROP_DB 57 -#define TSDB_MSG_TYPE_DROP_DB_RSP 58 -#define TSDB_MSG_TYPE_USE_DB 59 -#define TSDB_MSG_TYPE_USE_DB_RSP 60 -#define TSDB_MSG_TYPE_ALTER_DB 61 -#define TSDB_MSG_TYPE_ALTER_DB_RSP 62 -#define TSDB_MSG_TYPE_CREATE_TABLE 63 -#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64 -#define TSDB_MSG_TYPE_DROP_TABLE 65 -#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66 -#define TSDB_MSG_TYPE_ALTER_TABLE 67 -#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68 -#define TSDB_MSG_TYPE_VNODE_CFG 69 -#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70 -#define TSDB_MSG_TYPE_TABLE_CFG 71 -#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72 -#define TSDB_MSG_TYPE_TABLE_META 73 -#define TSDB_MSG_TYPE_TABLE_META_RSP 74 -#define TSDB_MSG_TYPE_STABLE_META 75 -#define TSDB_MSG_TYPE_STABLE_META_RSP 76 -#define TSDB_MSG_TYPE_MULTI_TABLE_META 77 -#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78 -#define TSDB_MSG_TYPE_ALTER_STREAM 79 -#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80 -#define TSDB_MSG_TYPE_SHOW 81 -#define TSDB_MSG_TYPE_SHOW_RSP 82 -#define TSDB_MSG_TYPE_CFG_MNODE 83 -#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84 -#define TSDB_MSG_TYPE_KILL_QUERY 85 -#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86 -#define TSDB_MSG_TYPE_KILL_STREAM 87 -#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88 -#define TSDB_MSG_TYPE_KILL_CONNECTION 89 -#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90 -#define TSDB_MSG_TYPE_HEARTBEAT 91 -#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92 -#define TSDB_MSG_TYPE_STATUS 93 -#define TSDB_MSG_TYPE_STATUS_RSP 94 -#define TSDB_MSG_TYPE_GRANT 95 -#define TSDB_MSG_TYPE_GRANT_RSP 96 -#define TSDB_MSG_TYPE_MAX 97 - -// IE type -#define TSDB_IE_TYPE_SEC 1 -#define TSDB_IE_TYPE_META 2 -#define TSDB_IE_TYPE_MGMT_IP 3 -#define TSDB_IE_TYPE_DNODE_CFG 4 -#define TSDB_IE_TYPE_NEW_VERSION 5 -#define TSDB_IE_TYPE_DNODE_EXT 6 -#define TSDB_IE_TYPE_DNODE_STATE 7 - -enum _mgmt_table { - TSDB_MGMT_TABLE_ACCT, - TSDB_MGMT_TABLE_USER, - TSDB_MGMT_TABLE_DB, - TSDB_MGMT_TABLE_TABLE, - TSDB_MGMT_TABLE_DNODE, - TSDB_MGMT_TABLE_MNODE, - TSDB_MGMT_TABLE_VGROUP, - TSDB_MGMT_TABLE_METRIC, - TSDB_MGMT_TABLE_MODULE, - TSDB_MGMT_TABLE_QUERIES, - TSDB_MGMT_TABLE_STREAMS, - TSDB_MGMT_TABLE_CONFIGS, - TSDB_MGMT_TABLE_CONNS, - TSDB_MGMT_TABLE_SCORES, - TSDB_MGMT_TABLE_GRANTS, - TSDB_MGMT_TABLE_VNODES, - TSDB_MGMT_TABLE_MAX, -}; - -#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 -#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 -#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 -#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 - -#define TSDB_ALTER_TABLE_ADD_COLUMN 5 -#define TSDB_ALTER_TABLE_DROP_COLUMN 6 - -#define TSDB_INTERPO_NONE 0 -#define TSDB_INTERPO_NULL 1 -#define TSDB_INTERPO_SET_VALUE 2 -#define TSDB_INTERPO_LINEAR 3 -#define TSDB_INTERPO_PREV 4 - -#define TSDB_ALTER_USER_PASSWD 0x1 -#define TSDB_ALTER_USER_PRIVILEGES 0x2 - -#define TSDB_KILL_MSG_LEN 30 - -typedef enum { - TSDB_TABLE_TYPE_SUPER_TABLE = 0, // super table - TSDB_TABLE_TYPE_CHILD_TABLE = 1, // table created from super table - TSDB_TABLE_TYPE_NORMAL_TABLE = 2, // ordinary table - TSDB_TABLE_TYPE_STREAM_TABLE = 3, // table created from stream computing - TSDB_TABLE_TYPE_MAX = 4 -} ETableType; - - -#define TSDB_VN_READ_ACCCESS ((char)0x1) -#define TSDB_VN_WRITE_ACCCESS ((char)0x2) -#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) - -#define TSDB_COL_NORMAL 0x0U -#define TSDB_COL_TAG 0x1U -#define TSDB_COL_JOIN 0x2U - -extern char *taosMsg[]; - -#pragma pack(push, 1) - -typedef struct { - int32_t vnode; - int32_t sid; - int32_t sversion; - uint64_t uid; - int16_t numOfRows; - char payLoad[]; -} SShellSubmitBlock; - -typedef struct { - int8_t import; - int8_t reserved[3]; - int32_t numOfSid; /* total number of sid */ - char blks[]; /* numOfSid blocks, each blocks for one table */ -} SShellSubmitMsg; - -typedef struct { - int32_t index; // index of failed block in submit blocks - int32_t vnode; // vnode index of failed block - int32_t sid; // table index of failed block - int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table -} SShellSubmitRspBlock; - -typedef struct { - int32_t code; // 0-success, > 0 error code - int32_t numOfRows; // number of records the client is trying to write - int32_t affectedRows; // number of records actually written - int32_t failedRows; // number of failed records (exclude duplicate records) - int32_t numOfFailedBlocks; - SShellSubmitRspBlock failedBlocks[]; -} SShellSubmitRspMsg; - -typedef struct SSchema { - uint8_t type; - char name[TSDB_COL_NAME_LEN + 1]; - int16_t colId; - int16_t bytes; -} SSchema; - -typedef struct { - int32_t vnode; //the index of vnode - uint32_t ip; -} SVPeerDesc; - -typedef struct { - int8_t tableType; - int16_t numOfColumns; - int16_t numOfTags; - int32_t sid; - int32_t sversion; - int32_t tagDataLen; - int32_t sqlDataLen; - int32_t contLen; - int32_t numOfVPeers; - uint64_t uid; - uint64_t superTableUid; - uint64_t createdTime; - SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; - char tableId[TSDB_TABLE_ID_LEN + 1]; - char superTableId[TSDB_TABLE_ID_LEN + 1]; - char data[]; -} SDCreateTableMsg; - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - char db[TSDB_DB_NAME_LEN + 1]; - int8_t igExists; - int16_t numOfTags; - int16_t numOfColumns; - int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string - int16_t reserved[16]; - SSchema schema[]; -} SCreateTableMsg; - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - int8_t igNotExists; -} SDropTableMsg; - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - char db[TSDB_DB_NAME_LEN + 1]; - int16_t type; /* operation type */ - char tagVal[TSDB_MAX_BYTES_PER_ROW]; - int8_t numOfCols; /* number of schema */ - SSchema schema[]; -} SAlterTableMsg; - -typedef struct { - char clientVersion[TSDB_VERSION_LEN]; - char msgVersion[TSDB_VERSION_LEN]; - char db[TSDB_TABLE_ID_LEN + 1]; -} SConnectMsg; - -typedef struct { - char acctId[TSDB_ACCT_LEN + 1]; - char serverVersion[TSDB_VERSION_LEN]; - int8_t writeAuth; - int8_t superAuth; - SRpcIpSet ipList; -} SConnectRsp; - -typedef struct { - int32_t maxUsers; - int32_t maxDbs; - int32_t maxTimeSeries; - int32_t maxConnections; - int32_t maxStreams; - int32_t maxPointsPerSecond; - int64_t maxStorage; // In unit of GB - int64_t maxQueryTime; // In unit of hour - int64_t maxInbound; - int64_t maxOutbound; - int8_t accessState; // Configured only by command -} SAcctCfg; - -typedef struct { - char user[TSDB_USER_LEN + 1]; - char pass[TSDB_KEY_LEN + 1]; - SAcctCfg cfg; -} SCreateAcctMsg, SAlterAcctMsg; - -typedef struct { - char user[TSDB_USER_LEN + 1]; -} SDropUserMsg, SDropAcctMsg; - -typedef struct { - char user[TSDB_USER_LEN + 1]; - char pass[TSDB_KEY_LEN + 1]; - int8_t privilege; - int8_t flag; -} SCreateUserMsg, SAlterUserMsg; - -typedef struct { - char db[TSDB_TABLE_ID_LEN + 1]; -} SMgmtHead; - -typedef struct { - int32_t sid; - int32_t numOfVPeers; - uint64_t uid; - SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; - char tableId[TSDB_TABLE_ID_LEN + 1]; -} SDRemoveTableMsg; - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - int64_t uid; -} SDRemoveSuperTableMsg; - -typedef struct { - int32_t vnode; -} SFreeVnodeMsg; - -typedef struct SColIndexEx { - int16_t colId; - /* - * colIdx is the index of column in latest schema of table - * it is available in the client side. Also used to determine - * whether current table schema is up-to-date. - * - * colIdxInBuf is used to denote the index of column in pQuery->colList, - * this value is invalid in client side, as well as in cache block of vnode either. - */ - int16_t colIdx; - int16_t colIdxInBuf; - uint16_t flag; // denote if it is a tag or not -} SColIndexEx; - -/* sql function msg, to describe the message to vnode about sql function - * operations in select clause */ -typedef struct SSqlFuncExprMsg { - int16_t functionId; - int16_t numOfParams; - - SColIndexEx colInfo; - struct ArgElem { - int16_t argType; - int16_t argBytes; - union { - double d; - int64_t i64; - char * pz; - } argValue; - } arg[3]; -} SSqlFuncExprMsg; - -typedef struct SSqlBinaryExprInfo { - struct tSQLBinaryExpr *pBinExpr; /* for binary expression */ - int32_t numOfCols; /* binary expression involves the readed number of columns*/ - SColIndexEx * pReqColumns; /* source column list */ -} SSqlBinaryExprInfo; - -typedef struct SSqlFunctionExpr { - SSqlFuncExprMsg pBase; - SSqlBinaryExprInfo pBinExprInfo; - int16_t resBytes; - int16_t resType; - int16_t interResBytes; -} SSqlFunctionExpr; - -typedef struct SColumnFilterInfo { - int16_t lowerRelOptr; - int16_t upperRelOptr; - int16_t filterOnBinary; /* denote if current column is binary */ - - union { - struct { - int64_t lowerBndi; - int64_t upperBndi; - }; - struct { - double lowerBndd; - double upperBndd; - }; - struct { - int64_t pz; - int64_t len; - }; - }; -} SColumnFilterInfo; - -/* - * for client side struct, we only need the column id, type, bytes are not necessary - * But for data in vnode side, we need all the following information. - */ -typedef struct SColumnInfo { - int16_t colId; - int16_t type; - int16_t bytes; - int16_t numOfFilters; - SColumnFilterInfo *filters; -} SColumnInfo; - -/* - * enable vnode to understand how to group several tables with different tag; - */ -typedef struct STableSidExtInfo { - int32_t sid; - int64_t uid; - TSKEY key; // key for subscription - char tags[]; -} STableSidExtInfo; - -/* - * the outputCols is equalled to or larger than numOfCols - * e.g., select min(colName), max(colName), avg(colName) from table - * the outputCols will be 3 while the numOfCols is 1. - */ -typedef struct { - int16_t vnode; - int32_t numOfSids; - uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may - - uint64_t uid; - TSKEY skey; - TSKEY ekey; - - int16_t order; - int16_t orderColId; - - int16_t numOfCols; // the number of columns will be load from vnode - char intervalTimeUnit; // time interval type, for revisement of interval(1d) - - int64_t nAggTimeInterval; // time interval for aggregation, in million second - int64_t slidingTime; // value for sliding window - - // tag schema, used to parse tag information in pSidExtInfo - uint64_t pTagSchema; - - int16_t numOfTagsCols; // required number of tags - int16_t tagLength; // tag length in current query - - int16_t numOfGroupCols; // num of group by columns - int16_t orderByIdx; - int16_t orderType; // used in group by xx order by xxx - uint64_t groupbyTagIds; - - int64_t limit; - int64_t offset; - - int16_t queryType; // denote another query process - int16_t numOfOutputCols; // final output columns numbers - - int16_t interpoType; // interpolate type - uint64_t defaultVal; // default value array list - - int32_t colNameLen; - int64_t colNameList; - - int64_t pSqlFuncExprs; - - int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed - int32_t tsLen; // total length of ts comp block - int32_t tsNumOfBlocks; // ts comp block numbers - int32_t tsOrder; // ts comp block order - SColumnInfo colList[]; -} SQueryTableMsg; - -typedef struct { - char code; - uint64_t qhandle; -} SQueryTableRsp; - -typedef struct { - uint64_t qhandle; - uint16_t free; -} SRetrieveTableMsg; - -typedef struct { - int32_t numOfRows; - int16_t precision; - int64_t offset; // updated offset value for multi-vnode projection query - int64_t useconds; - char data[]; -} SRetrieveTableRsp; - -typedef struct { - uint32_t vnode; - uint32_t vgId; - uint8_t status; - uint8_t dropStatus; - uint8_t accessState; - int64_t totalStorage; - int64_t compStorage; - int64_t pointsWritten; - uint8_t syncStatus; - uint8_t reserved[15]; -} SVnodeLoad; - -typedef struct { - uint32_t vnode; - char accessState; -} SVnodeAccess; - -/* - * NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4 - */ -typedef struct { - char acct[TSDB_USER_LEN + 1]; - char db[TSDB_DB_NAME_LEN + 1]; - uint32_t vgId; - int32_t maxSessions; - int32_t cacheBlockSize; - union { - int32_t totalBlocks; - float fraction; - } cacheNumOfBlocks; - int32_t daysPerFile; - int32_t daysToKeep1; - int32_t daysToKeep2; - int32_t daysToKeep; - int32_t commitTime; - int32_t rowsInFileBlock; - int16_t blocksPerTable; - int8_t compression; - int8_t commitLog; - int8_t replications; - int8_t repStrategy; - int8_t loadLatest; // load into mem or not - uint8_t precision; // time resolution - int8_t reserved[16]; -} SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg; - -typedef struct { - char db[TSDB_TABLE_ID_LEN + 1]; - uint8_t ignoreNotExists; -} SDropDbMsg, SUseDbMsg; - -// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed -// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE -typedef struct { - int64_t pointsWritten; // In unit of points - int64_t totalStorage; // In unit of bytes - int64_t compStorage; // In unit of bytes - int64_t queryTime; // In unit of second ?? - char reserved[64]; -} SVnodeStatisticInfo; - -typedef struct { - uint32_t version; - uint32_t publicIp; - uint32_t lastReboot; // time stamp for last reboot - uint16_t numOfCores; - uint8_t alternativeRole; - uint8_t reserve; - uint16_t numOfTotalVnodes; // from config file - uint16_t unused; - float diskAvailable; // GB - uint32_t openVnodes; - char reserved[16]; - SVnodeLoad load[]; -} SStatusMsg; - -typedef struct { - int32_t code; - SRpcIpSet ipList; -} SStatusRsp; - -typedef struct { - uint32_t moduleStatus; - uint32_t createdTime; - uint32_t numOfVnodes; - uint32_t reserved; -} SDnodeState; - -// internal message -typedef struct { - uint32_t destId; - uint32_t destIp; - char tableId[TSDB_UNI_LEN + 1]; - char empty[3]; - uint8_t msgType; - int32_t msgLen; - uint8_t content[0]; -} SIntMsg; - -typedef struct { - char spi; - char encrypt; - char secret[TSDB_KEY_LEN]; // key is changed if updated - char cipheringKey[TSDB_KEY_LEN]; -} SSecIe; - -typedef struct { - int32_t numOfVPeers; - SVPeerDesc vpeerDesc[]; -} SVpeerDescArray; - -typedef struct { - int32_t vnode; - SVnodeCfg cfg; - SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; -} SCreateVnodeMsg; - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - int16_t createFlag; - char tags[]; -} STableInfoMsg; - -typedef struct { - int32_t numOfTables; - char tableIds[]; -} SMultiTableInfoMsg; - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; -} SSuperTableInfoMsg; - -typedef struct { - int32_t numOfDnodes; - uint32_t dnodeIps[]; -} SSuperTableInfoRsp; - -typedef struct { - int16_t elemLen; - - char tableId[TSDB_TABLE_ID_LEN + 1]; - int16_t orderIndex; - int16_t orderType; // used in group by xx order by xxx - - int16_t rel; // denotes the relation between condition and table list - - int32_t tableCond; // offset value of table name condition - int32_t tableCondLen; - - int32_t cond; // offset of column query condition - int32_t condLen; - - int16_t tagCols[TSDB_MAX_TAGS + 1]; // required tag columns, plus one is for table name - int16_t numOfTags; // required number of tags - - int16_t numOfGroupCols; // num of group by columns - int32_t groupbyTagColumnList; -} SSuperTableMetaElemMsg; - -typedef struct { - int32_t numOfTables; - int32_t join; - int32_t joinCondLen; // for join condition - int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM]; -} SSuperTableMetaMsg; - -typedef struct { - SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; - int16_t index; // used locally - int32_t numOfSids; - int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo -} SVnodeSidList; - -typedef struct { - int32_t numOfTables; - int32_t numOfVnodes; - uint16_t tagLen; /* tag value length */ - int32_t list[]; /* offset of SVnodeSidList, compared to the SSuperTableMeta struct */ -} SSuperTableMeta; - -typedef struct STableMeta { - char tableId[TSDB_TABLE_ID_LEN + 1]; // note: This field must be at the front - int32_t contLen; - uint8_t numOfTags : 6; - uint8_t precision : 2; - uint8_t tableType : 4; - uint8_t index : 4; // used locally - int16_t numOfColumns; - int16_t rowSize; // used locally, calculated in client - int16_t sversion; - int8_t numOfVpeers; - SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; - int32_t sid; - int32_t vgid; - uint64_t uid; - SSchema schema[]; -} STableMeta; - -typedef struct SMultiTableMeta { - int32_t numOfTables; - int32_t contLen; - STableMeta metas[]; -} SMultiTableMeta; - -typedef struct { - char name[TSDB_TABLE_ID_LEN + 1]; - char data[TSDB_MAX_TAGS_LEN]; -} STagData; - -/* - * sql: show tables like '%a_%' - * payload is the query condition, e.g., '%a_%' - * payloadLen is the length of payload - */ -typedef struct { - int8_t type; - char db[TSDB_DB_NAME_LEN + 1]; - uint16_t payloadLen; - char payload[]; -} SShowMsg; - -typedef struct { - uint64_t qhandle; - STableMeta tableMeta; -} SShowRsp; - -typedef struct { - char ip[32]; -} SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg; - -typedef struct { - uint32_t dnode; - int32_t vnode; - int32_t sid; -} STableCfgMsg; - -typedef struct { - uint32_t dnode; - int32_t vnode; -} SVpeerCfgMsg; - -typedef struct { - char ip[32]; - char config[64]; -} SCfgDnodeMsg; - -typedef struct { - char sql[TSDB_SHOW_SQL_LEN + 1]; - uint32_t queryId; - int64_t useconds; - int64_t stime; -} SQueryDesc; - -typedef struct { - char sql[TSDB_SHOW_SQL_LEN + 1]; - uint32_t streamId; - int64_t num; // number of computing/cycles - int64_t useconds; - int64_t ctime; - int64_t stime; - int64_t slidingTime; - int64_t interval; -} SStreamDesc; - -typedef struct { - int32_t numOfQueries; - SQueryDesc qdesc[]; -} SQqueryList; - -typedef struct { - int32_t numOfStreams; - SStreamDesc sdesc[]; -} SStreamList; - -typedef struct { - SQqueryList qlist; - SStreamList slist; -} SHeartBeatMsg; - -typedef struct { - uint32_t queryId; - uint32_t streamId; - int8_t killConnection; - SRpcIpSet ipList; -} SHeartBeatRsp; - -typedef struct { - char queryId[TSDB_KILL_MSG_LEN + 1]; -} SKillQueryMsg, SKillStreamMsg, SKillConnectionMsg; - -typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; - uint64_t stime; // stream starting time - int32_t status; - char tableId[TSDB_TABLE_ID_LEN + 1]; -} SDAlterStreamMsg; - -#pragma pack(pop) - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 45ff014dc51fa92d1abfb335751d3c44f61246a0..15a6096826f2c0e96e6890956856ad783a592fae 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -42,23 +42,19 @@ extern uint32_t tsRebootTime; // dnodeCluster extern void (*dnodeStartModules)(); -extern void (*dnodeParseParameterK)(); extern int32_t (*dnodeCheckSystem)(); // dnodeSystem extern void *tsDnodeMgmtQhandle; +void dnodeCheckDataDirOpenned(const char* dir); void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); // dnodeModule extern void (*dnodeStartModules)(); -// multilevelStorage -extern int32_t (*dnodeInitStorage)(); -extern void (*dnodeCleanupStorage)(); -void dnodeCheckDataDirOpenned(const char* dir); void dnodeLockVnodes(); void dnodeUnLockVnodes(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index ca6294f623f6a6bdc5e6981068cd0690d0aef153..34c8b8c77e4821ebb7a1aab1c4e148ffd3893302 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -258,14 +258,12 @@ typedef struct { } SShowObj; //mgmtSystem +int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); -void mgmtCleanUpSystem(); -void mgmtProcessMsgFromDnode(char msgType, void *pCont, int contLen, void *pConn, int32_t code); -extern int32_t (*mgmtInitSystem)(); -extern void (*mgmtStopSystem)(); -extern void (*mgmtCleanUpRedirect)(); - +void mgmtCleanUpSystem(); +void mgmtStopSystem(); +void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); #ifdef __cplusplus } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 6861d31b873a7898ae1e9812ade92cb25ff0c543..0e2ad607b0f6a9a4680e92f015fd34f57ca677c8 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -163,6 +163,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismat TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG_VERSION, 0, 122, "invalid version of message") +TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 123, "dnode not exist") #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f74bff7e57fa5a0e9773263d23ea536068ef866e..a05bd20f84786f6e5f19e137b41ed559547d36f2 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -309,8 +309,8 @@ typedef struct { } SAcctCfg; typedef struct { - char user[TSDB_USER_LEN + 1]; - char pass[TSDB_KEY_LEN + 1]; + char user[TSDB_USER_LEN + 1]; + char pass[TSDB_KEY_LEN + 1]; SAcctCfg cfg; } SCreateAcctMsg, SAlterAcctMsg; @@ -572,33 +572,35 @@ typedef struct { char reserved[64]; } SVnodeStatisticInfo; +typedef struct { + uint32_t moduleStatus; + uint32_t createdTime; + uint32_t numOfVnodes; + uint32_t reserved; +} SDnodeState; + typedef struct { uint32_t version; + uint32_t privateIp; uint32_t publicIp; - uint32_t lastReboot; // time stamp for last reboot + uint32_t lastReboot; // time stamp for last reboot + uint16_t numOfTotalVnodes; // from config file + uint16_t openVnodes; uint16_t numOfCores; + float diskAvailable; // GB uint8_t alternativeRole; - uint8_t reserve; - uint16_t numOfTotalVnodes; // from config file - uint16_t unused; - float diskAvailable; // GB - uint32_t openVnodes; - char reserved[16]; + uint8_t reserve[15]; SVnodeLoad load[]; } SStatusMsg; typedef struct { - int32_t code; - SRpcIpSet ipList; + int32_t code; + int32_t numOfVnodes; + SDnodeState dnodeState; + SRpcIpSet ipList; + SVnodeAccess vnodeAccess[]; } SStatusRsp; -typedef struct { - uint32_t moduleStatus; - uint32_t createdTime; - uint32_t numOfVnodes; - uint32_t reserved; -} SDnodeState; - // internal message typedef struct { uint32_t destId; diff --git a/src/mnode/inc/mgmtAcct.h b/src/mnode/inc/mgmtAcct.h index edc30409d6e4bf1f4b75fdeaa9f71ad935f8c617..1aaa35e6851177c0ea7561a465a28b80628649bc 100644 --- a/src/mnode/inc/mgmtAcct.h +++ b/src/mnode/inc/mgmtAcct.h @@ -22,19 +22,24 @@ extern "C" { #include "mnode.h" +int32_t mgmtInitAccts(); +void mgmtCleanUpAccts(); +SAcctObj *mgmtGetAcct(char *acctName); + +int32_t mgmtCheckUserLimit(SAcctObj *pAcct); +int32_t mgmtCheckDbLimit(SAcctObj *pAcct); +int32_t mgmtCheckTableLimit(SAcctObj *pAcct, int32_t numOfTimeSeries); +int32_t mgmtGetAcctMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveAccts(SShowObj *pShow, char *data, int32_t rows, void *pConn); + int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb); int32_t mgmtRemoveDbFromAcct(SAcctObj *pAcct, SDbObj *pDb); int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser); int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser); -extern int32_t (*mgmtInitAccts)(); -extern void (*mgmtCleanUpAccts)(); -extern SAcctObj* (*mgmtGetAcct)(char *acctName); -extern int32_t (*mgmtCheckUserLimit)(SAcctObj *pAcct); -extern int32_t (*mgmtCheckDbLimit)(SAcctObj *pAcct); -extern int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate); -extern int32_t (*mgmtGetAcctMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn); +extern int32_t (*mgmtCreateAcctFp)(char *name, char *pass, SAcctCfg *pCfg); +extern int32_t (*mgmtDropAcctFp)(char *name); +extern int32_t (*mgmtAlterAcctFp)(char *name, char *pass, SAcctCfg *pCfg); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 493f2fba0d6203a7c9ac9d3ce3134f75c94ffe67..61331b9b3d3c8d90cfe48550511e44e07a8fbd5e 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -20,19 +20,13 @@ extern "C" { #endif -#include "os.h" #include "mnode.h" -extern void (*mgmtStartBalanceTimer)(int64_t mseconds); -extern int32_t (*mgmtInitBalance)(); -extern void (*mgmtCleanupBalance)(); -extern int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup); -extern bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType); -extern char* (*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode); -extern bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode); -extern void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus); -extern void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp); -extern bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode); +void mgmtStartBalanceTimer(int64_t mseconds); +int32_t mgmtInitBalance(); +void mgmtCleanupBalance(); +int32_t mgmtAllocVnodes(SVgObj *pVgroup); +char* mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 6159d5e5dca0336fa979295da79d04688600e553..4cdac1e7afd367fedcab36a07f6e36d26a215551 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -24,10 +24,6 @@ extern "C" { #include #include "mnode.h" -int32_t mgmtCreateDnode(uint32_t ip); -int32_t mgmtDropDnode(SDnodeObj *pDnode); -int32_t mgmtDropDnodeByIp(uint32_t ip); -int32_t mgmtGetNextVnode(SVnodeGid *pVnodeGid); void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId); void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes); int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); @@ -38,25 +34,24 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode); int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); +bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType); int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -extern int32_t (*mgmtInitDnodes)(); -extern void (*mgmtCleanUpDnodes)(); -extern SDnodeObj* (*mgmtGetDnode)(uint32_t ip); -extern int32_t (*mgmtGetDnodesNum)(); -extern void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode); -extern int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode); -extern void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode); -extern int32_t (*mgmtGetScoresMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg); - -extern SDnodeObj tsDnodeObj; - +int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +int32_t mgmtInitDnodes(); +void mgmtCleanUpDnodes(); +int32_t mgmtGetDnodesNum(); +int32_t mgmtUpdateDnode(SDnodeObj *pDnode); +void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode); +bool mgmtCheckConfigShow(SGlobalConfig *cfg); +void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); +SDnodeObj* mgmtGetDnode(uint32_t ip); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 772c2ba3310366c0006e7d1ce76a2e35652c0cc0..d3f5cffac3074be4921baaae28e52e0fc9a501ed 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -34,12 +34,12 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle); -extern int32_t (*mgmtInitDnodeInt)(); -extern void (*mgmtCleanUpDnodeInt)(); -extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); +int32_t mgmtInitDnodeInt(); +void mgmtCleanUpDnodeInt(); void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle); void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); +void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtGrant.h b/src/mnode/inc/mgmtGrant.h index e68e6ae71e31e6d961cf3f3a2ce352cb9a46d16f..1cfc88f94af206c4291e4cf2be7551cffbe77244 100644 --- a/src/mnode/inc/mgmtGrant.h +++ b/src/mnode/inc/mgmtGrant.h @@ -17,21 +17,23 @@ #define TDENGINE_MGMT_GTANT_H #ifdef __cplusplus -extern "C" { +"C" { #endif #include #include #include "mnode.h" -extern bool (*mgmtCheckExpired)(); -extern void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum); -extern void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeseries); -extern int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries); -extern int32_t (*mgmtCheckUserGrant)(); -extern int32_t (*mgmtCheckDbGrant)(); -extern int32_t (*mgmtGetGrantsMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn); +bool mgmtCheckExpired(); +void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum); +void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries); +int32_t mgmtCheckTimeSeries(uint32_t timeseries); +int32_t mgmtCheckUserGrant(); +int32_t mgmtCheckDbGrant(); +int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +extern void (*mgmtUpdateGrantInfoFp)(void *pCont); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index e27296de7745abc83ef8adecbfca49744f661cbc..d012997d136903d9ae232756a87b38fb54d33010 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -24,8 +24,8 @@ extern "C" { #include #include "mnode.h" -extern int32_t (*mgmtGetMnodeMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn); + int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index 56668b512ffd6ee886d7e1bdef91007db07a8794..f14871b5b205e055724009179c6cb7b5c45053bc 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -28,13 +28,10 @@ int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); -extern void (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle); -extern void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle); /* * If table not exist, will create it diff --git a/src/mnode/inc/mgmtSystem.h b/src/mnode/inc/mgmtSystem.h index 0ee119043a3d434a7d3772bf5d35c866605e4749..5d71809f36de9ab44797c907c89fad44de0d47f6 100644 --- a/src/mnode/inc/mgmtSystem.h +++ b/src/mnode/inc/mgmtSystem.h @@ -22,14 +22,10 @@ extern "C" { #include +int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); - -extern int32_t (*mgmtInitSystem)(); -extern int32_t (*mgmtCheckMgmtRunning)(); -extern void (*mgmtDoStatistic)(void *handle, void *tmrId); -extern void (*mgmtStopSystem)(); -extern void (*mgmtCleanUpRedirect)(); +void mgmtStopSystem(); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index 15db1680d47d803c0fa0c2eb52ede5580a245659..53db1390e7d864d1f61bfbafe4c1b0890488d9bc 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -26,6 +26,20 @@ extern void *tsUserSdb; extern void *tsDbSdb; static SAcctObj tsAcctObj; +int32_t (*mgmtInitAcctsFp)() = NULL; +void (*mgmtCleanUpAcctsFp)() = NULL; + +int32_t (*mgmtCreateAcctFp)(char *name, char *pass, SAcctCfg *pCfg) = NULL; +int32_t (*mgmtDropAcctFp)(char *name) = NULL; +int32_t (*mgmtAlterAcctFp)(char *name, char *pass, SAcctCfg *pCfg) = NULL; +int32_t (*mgmtGetAcctMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; +int32_t (*mgmtRetrieveAcctsFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; +SAcctObj *(*mgmtGetAcctFp)(char *acctName) = NULL; + +int32_t (*mgmtCheckUserLimitFp)(SAcctObj *pAcct) = NULL; +int32_t (*mgmtCheckDbLimitFp)(SAcctObj *pAcct) = NULL; +int32_t (*mgmtCheckTimeSeriesLimitFp)(SAcctObj *pAcct, int32_t numOfTimeSeries) = NULL; + int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) { pthread_mutex_lock(&pAcct->mutex); pDb->next = pAcct->pHead; @@ -97,73 +111,90 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) { return 0; } -int32_t mgmtInitAcctsImp() { - SAcctObj *pAcct = &tsAcctObj; - pAcct->acctId = 0; - strcpy(pAcct->user, "root"); - return 0; +int32_t mgmtInitAccts() { + if (mgmtInitAcctsFp) { + return mgmtInitAcctsFp(); + } else { + SAcctObj *pAcct = &tsAcctObj; + pAcct->acctId = 0; + strcpy(pAcct->user, "root"); + return 0; + } } -int32_t (*mgmtInitAccts)() = mgmtInitAcctsImp; - -static SAcctObj *mgmtGetAcctImp(char *acctName) { - return &tsAcctObj; +SAcctObj *mgmtGetAcct(char *acctName) { + if (mgmtGetAcctFp) { + return mgmtGetAcctFp(acctName); + } else { + return &tsAcctObj; + } } -SAcctObj *(*mgmtGetAcct)(char *acctName) = mgmtGetAcctImp; - -static int32_t mgmtCheckUserLimitImp(SAcctObj *pAcct) { - int32_t numOfUsers = sdbGetNumOfRows(tsUserSdb); - if (numOfUsers >= tsMaxUsers) { - mWarn("numOfUsers:%d, exceed tsMaxUsers:%d", numOfUsers, tsMaxUsers); - return TSDB_CODE_TOO_MANY_USERS; +int32_t mgmtCheckUserLimit(SAcctObj *pAcct) { + if (mgmtCheckUserLimitFp) { + return mgmtCheckUserLimitFp(pAcct); + } else { + int32_t numOfUsers = sdbGetNumOfRows(tsUserSdb); + if (numOfUsers >= tsMaxUsers) { + mWarn("numOfUsers:%d, exceed tsMaxUsers:%d", numOfUsers, tsMaxUsers); + return TSDB_CODE_TOO_MANY_USERS; + } + return 0; } - return 0; } -int32_t (*mgmtCheckUserLimit)(SAcctObj *pAcct) = mgmtCheckUserLimitImp; - -static int32_t mgmtCheckDbLimitImp(SAcctObj *pAcct) { - int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb); - if (numOfDbs >= tsMaxDbs) { - mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); - return TSDB_CODE_TOO_MANY_DATABASES; +int32_t mgmtCheckDbLimit(SAcctObj *pAcct) { + if (mgmtCheckDbLimitFp) { + return mgmtCheckDbLimitFp(pAcct); + } else { + int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb); + if (numOfDbs >= tsMaxDbs) { + mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); + return TSDB_CODE_TOO_MANY_DATABASES; + } + return 0; } - return 0; } -int32_t (*mgmtCheckDbLimit)(SAcctObj *pAcct) = mgmtCheckDbLimitImp; - -static int32_t mgmtCheckTableLimitImp(SAcctObj *pAcct, SCreateTableMsg *pCreate) { - return 0; +int32_t mgmtCheckTableLimit(SAcctObj *pAcct, int32_t numOfTimeSeries) { + if (mgmtCheckTimeSeriesLimitFp) { + return mgmtCheckTimeSeriesLimitFp(pAcct, numOfTimeSeries); + } else { + return 0; + } } -int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate) = mgmtCheckTableLimitImp; - -static void mgmtCleanUpAcctsImp() { +void mgmtCleanUpAccts() { + if (mgmtCleanUpAcctsFp) { + mgmtCleanUpAcctsFp(); + } } -void (*mgmtCleanUpAccts)() = mgmtCleanUpAcctsImp; - -static int32_t mgmtGetAcctMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - return TSDB_CODE_OPS_NOT_SUPPORT; +int32_t mgmtGetAcctMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + if (mgmtGetAcctMetaFp) { + return mgmtGetAcctMetaFp(pMeta, pShow, pConn); + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } } -int32_t (*mgmtGetAcctMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetAcctMetaImp; - -static int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - return 0; +int32_t mgmtRetrieveAccts(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + if (mgmtRetrieveAcctsFp) { + return mgmtRetrieveAcctsFp(pShow, data, rows, pConn); + } else { + return 0; + } } -int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveAcctsImp; - SAcctObj *mgmtGetAcctFromConn(void *pConn) { SRpcConnInfo connInfo; rpcGetConnInfo(pConn, &connInfo); - SUserObj *pUser = mgmtGetUser(connInfo.user); - if(pUser != NULL) { + + SUserObj *pUser = mgmtGetUser(connInfo.user); + if (pUser != NULL) { return pUser->pAcct; } return NULL; } + diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index cf1c51ad9009903b4a6c5aea10c7a098d7b6df1e..1e5fc54c5a82102b5a0dd8db840be8d1ccc0bc63 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -21,19 +21,42 @@ #include "mgmtBalance.h" #include "mgmtDnode.h" -void mgmtStartBalanceTimerImp(int64_t mseconds) {} -void (*mgmtStartBalanceTimer)(int64_t mseconds) = mgmtStartBalanceTimerImp; +void (*mgmtStartBalanceTimerFp)(int64_t mseconds) = NULL; +int32_t (*mgmtInitBalanceFp)() = NULL; +void (*mgmtCleanupBalanceFp)() = NULL; +int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL; +char * (*mgmtGetVnodeStatusFp)(SVgObj *pVgroup, SVnodeGid *pVnode) = NULL; + +void mgmtStartBalanceTimer(int64_t mseconds) { + if (mgmtStartBalanceTimerFp) { + (*mgmtStartBalanceTimerFp)(mseconds); + } +} -int32_t mgmtInitBalanceImp() { return 0; } -int32_t (*mgmtInitBalance)() = mgmtInitBalanceImp; +int32_t mgmtInitBalance() { + if (mgmtInitBalanceFp) { + return (*mgmtInitBalanceFp)(); + } else { + return 0; + } +} -void mgmtCleanupBalanceImp() {} -void (*mgmtCleanupBalance)() = mgmtCleanupBalanceImp; +void mgmtCleanupBalance() { + if (mgmtCleanupBalanceFp) { + (*mgmtCleanupBalanceFp)(); + } +} -int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) { - int selectedVnode = -1; - SDnodeObj *pDnode = &tsDnodeObj; - int lastAllocVode = pDnode->lastAllocVnode; +int32_t mgmtAllocVnodes(SVgObj *pVgroup) { + if (mgmtAllocVnodesFp) { + return mgmtAllocVnodesFp(pVgroup); + } + + SDnodeObj *pDnode = mgmtGetDnode(0); + if (pDnode == NULL) return TSDB_CODE_OTHERS; + + int selectedVnode = -1; + int lastAllocVode = pDnode->lastAllocVnode; for (int i = 0; i < pDnode->numOfVnodes; i++) { int vnode = (i + lastAllocVode) % pDnode->numOfVnodes; @@ -49,44 +72,16 @@ int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) { } else { mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode); pVgroup->vnodeGid[0].vnode = selectedVnode; - pDnode->lastAllocVnode = selectedVnode + 1; + pDnode->lastAllocVnode = selectedVnode + 1; if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; return 0; } } -int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup) = mgmtAllocVnodesImp; - -bool mgmtCheckModuleInDnodeImp(SDnodeObj *pDnode, int moduleType) { - return tsModule[moduleType].num != 0; -} - -bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType) = mgmtCheckModuleInDnodeImp; - -char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) { - return "master"; -} - -char *(*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtGetVnodeStatusImp; - -bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { - return true; -} - -bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtCheckVnodeReadyImp; - -void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) { -} - -void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus) = mgmtUpdateDnodeStateImp; -void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) { -} - -void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp) = mgmtUpdateVgroupStateImp; - -bool mgmtAddVnodeImp(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { - return false; +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { + if (mgmtGetVnodeStatusFp) { + return (*mgmtGetVnodeStatusFp)(pVgroup, pVnode); + } else { + return "master"; + } } - -bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) = mgmtAddVnodeImp; - diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index e2b141fb64edb1507cf7758bc56f16b209614af4..02771b19c9018f3c98fbd2361c922918ba12ab0e 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -442,7 +442,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName // // mgmtMeterActionEncode(pTable, msg, size, &rowSize); // -// int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1); // Need callback function +// int32_t ret = sdbUpdateRow(tsChildTableSdb, msg, rowSize, 1); // Need callback function // tfree(msg); // // if (pTable->isDirty) pTable->isDirty = 0; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 857e05fddc7297b4349b65b067ccb510fb02f28d..bedd51dcffa23cc791b301acc708778b7db60490 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -409,68 +409,69 @@ void mgmtMonitorDbDrop(void *unused, void *unusedt) { } int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { - int32_t code = TSDB_CODE_SUCCESS; - - SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db); - if (pDb == NULL) { - mTrace("db:%s is not exist", pAlter->db); - return TSDB_CODE_INVALID_DB; - } - - int32_t oldReplicaNum = pDb->cfg.replications; - if (pAlter->daysToKeep > 0) { - mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); - pDb->cfg.daysToKeep = pAlter->daysToKeep; - } else if (pAlter->replications > 0) { - mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications); - if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) { - mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); - return TSDB_CODE_INVALID_OPTION; - } - pDb->cfg.replications = pAlter->replications; - } else if (pAlter->maxSessions > 0) { - mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions); - if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) { - mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); - return TSDB_CODE_INVALID_OPTION; - } - if (pAlter->maxSessions < pDb->cfg.maxSessions) { - mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions); - return TSDB_CODE_INVALID_OPTION; - } - return TSDB_CODE_INVALID_OPTION; - //The modification of tables needs to rewrite the head file, so disable this option - //pDb->cfg.maxSessions = pAlter->maxSessions; - } else { - mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name, - pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep, - pDb->cfg.replications, pDb->cfg.daysToKeep); - return TSDB_CODE_INVALID_OPTION; - } - - if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) { - return TSDB_CODE_SDB_ERROR; - } - - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); - if (oldReplicaNum < pDb->cfg.replications) { - if (!mgmtAddVnode(pVgroup, NULL, NULL)) { - mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); - code = TSDB_CODE_NO_ENOUGH_DNODES; - } - } - if (pAlter->maxSessions > 0) { - //rebuild meterList in mgmtVgroup.c - mgmtUpdateVgroup(pVgroup); - } -// mgmtSendCreateVnodeMsg(pVgroup); - pVgroup = pVgroup->next; - } - mgmtStartBalanceTimer(10); - - return code; + return 0; +// int32_t code = TSDB_CODE_SUCCESS; +// +// SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db); +// if (pDb == NULL) { +// mTrace("db:%s is not exist", pAlter->db); +// return TSDB_CODE_INVALID_DB; +// } +// +// int32_t oldReplicaNum = pDb->cfg.replications; +// if (pAlter->daysToKeep > 0) { +// mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); +// pDb->cfg.daysToKeep = pAlter->daysToKeep; +// } else if (pAlter->replications > 0) { +// mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications); +// if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) { +// mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); +// return TSDB_CODE_INVALID_OPTION; +// } +// pDb->cfg.replications = pAlter->replications; +// } else if (pAlter->maxSessions > 0) { +// mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions); +// if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) { +// mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); +// return TSDB_CODE_INVALID_OPTION; +// } +// if (pAlter->maxSessions < pDb->cfg.maxSessions) { +// mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions); +// return TSDB_CODE_INVALID_OPTION; +// } +// return TSDB_CODE_INVALID_OPTION; +// //The modification of tables needs to rewrite the head file, so disable this option +// //pDb->cfg.maxSessions = pAlter->maxSessions; +// } else { +// mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name, +// pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep, +// pDb->cfg.replications, pDb->cfg.daysToKeep); +// return TSDB_CODE_INVALID_OPTION; +// } +// +// if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) { +// return TSDB_CODE_SDB_ERROR; +// } +// +// SVgObj *pVgroup = pDb->pHead; +// while (pVgroup != NULL) { +// mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); +// if (oldReplicaNum < pDb->cfg.replications) { +// if (!mgmtAddVnode(pVgroup, NULL, NULL)) { +// mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); +// code = TSDB_CODE_NO_ENOUGH_DNODES; +// } +// } +// if (pAlter->maxSessions > 0) { +// //rebuild meterList in mgmtVgroup.c +// mgmtUpdateVgroup(pVgroup); +// } +//// mgmtSendCreateVnodeMsg(pVgroup); +// pVgroup = pVgroup->next; +// } +// mgmtStartBalanceTimer(10); +// +// return code; } int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 5a9e9aff358dfe686f23a4422c8200bc3bd87e36..20043c380081f86fbc025f2ce849cb4d9157c0da 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -24,7 +24,17 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" -SDnodeObj tsDnodeObj; +int32_t (*mgmtInitDnodesFp)() = NULL; +void (*mgmtCleanUpDnodesFp)() = NULL; +SDnodeObj *(*mgmtGetDnodeFp)(uint32_t ip) = NULL; +int32_t (*mgmtGetDnodesNumFp)() = NULL; +int32_t (*mgmtUpdateDnodeFp)(SDnodeObj *pDnode) = NULL; +void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL; +int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; +int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; +void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL; + +static SDnodeObj tsDnodeObj = {0}; void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; @@ -154,7 +164,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } pShow->numOfRows = mgmtGetDnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; @@ -165,9 +177,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; + int32_t cols = 0; SDnodeObj *pDnode = NULL; char *pWrite; - int32_t cols = 0; char ipstr[20]; while (numOfRows < rows) { @@ -213,6 +225,11 @@ int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon return numOfRows; } +bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { + uint32_t status = pDnode->moduleStatus & (1 << moduleType); + return status > 0; +} + int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; @@ -517,85 +534,102 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon return numOfRows; } -SDnodeObj *mgmtGetDnodeImp(uint32_t ip) { - return &tsDnodeObj; -} - -SDnodeObj *(*mgmtGetDnode)(uint32_t ip) = mgmtGetDnodeImp; +int32_t mgmtInitDnodes() { + if (mgmtInitDnodesFp) { + return mgmtInitDnodesFp(); + } else { + tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; + tsDnodeObj.createdTime = taosGetTimestampMs(); + tsDnodeObj.lastReboot = taosGetTimestampSec(); + tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores; + tsDnodeObj.status = TSDB_DN_STATUS_READY; + tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; + tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; + tsDnodeObj.thandle = (void *) (1); //hack way + tsDnodeObj.status = TSDB_DN_STATUS_READY; + mgmtSetDnodeMaxVnodes(&tsDnodeObj); -int32_t mgmtUpdateDnodeImp(SDnodeObj *pDnode) { - return 0; + tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT); + if (tsEnableHttpModule) { + tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_HTTP); + } + if (tsEnableMonitorModule) { + tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR); + } + return 0; + } } -int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode) = mgmtUpdateDnodeImp; - -void mgmtCleanUpDnodesImp() { +void mgmtCleanUpDnodes() { + if (mgmtCleanUpDnodesFp) { + mgmtCleanUpDnodesFp(); + } } -void (*mgmtCleanUpDnodes)() = mgmtCleanUpDnodesImp; - -int32_t mgmtInitDnodesImp() { - tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; - tsDnodeObj.createdTime = taosGetTimestampMs(); - tsDnodeObj.lastReboot = taosGetTimestampSec(); - tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores; - tsDnodeObj.status = TSDB_DN_STATUS_READY; - tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; - tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; - tsDnodeObj.thandle = (void *) (1); //hack way - if (tsDnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) { - mgmtSetDnodeMaxVnodes(&tsDnodeObj); - mPrint("dnode first access, set total vnodes:%d", tsDnodeObj.numOfVnodes); +SDnodeObj *mgmtGetDnode(uint32_t ip) { + if (mgmtGetDnodeFp) { + return mgmtGetDnodeFp(ip); + } else { + return &tsDnodeObj; } - - tsDnodeObj.status = TSDB_DN_STATUS_READY; - return 0; } -int32_t (*mgmtInitDnodes)() = mgmtInitDnodesImp; - -int32_t mgmtGetDnodesNumImp() { - return 1; +int32_t mgmtGetDnodesNum() { + if (mgmtGetDnodesNumFp) { + return mgmtGetDnodesNumFp(); + } else { + return 1; + } } -int32_t (*mgmtGetDnodesNum)() = mgmtGetDnodesNumImp; +int32_t mgmtUpdateDnode(SDnodeObj *pDnode) { + if (mgmtUpdateDnodeFp) { + return mgmtUpdateDnodeFp(pDnode); + } else { + return 0; + } +} -void *mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) { - if (*pDnode == NULL) { - *pDnode = &tsDnodeObj; +void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { + if (mgmtGetNextDnodeFp) { + return mgmtGetNextDnodeFp(pShow, pDnode); } else { - *pDnode = NULL; + if (*pDnode == NULL) { + *pDnode = &tsDnodeObj; + } else { + *pDnode = NULL; + } } return *pDnode; } -void *(*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp; - -int32_t mgmtGetScoresMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - return TSDB_CODE_OPS_NOT_SUPPORT; +int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + if (mgmtGetScoresMetaFp) { + return mgmtGetScoresMetaFp(pMeta, pShow, pConn); + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } } -int32_t (*mgmtGetScoresMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetScoresMetaImp; - -int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - return 0; +int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + if (mgmtRetrieveScoresFp) { + return mgmtRetrieveScoresFp(pShow, data, rows, pConn); + } else { + return 0; + } } -int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveScoresImp; - -void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) { +void mgmtSetDnodeUnRemove(SDnodeObj *pDnode) { + if (mgmtSetDnodeUnRemoveFp) { + mgmtSetDnodeUnRemoveFp(pDnode); + } } -void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode) = mgmtSetDnodeUnRemoveImp; - -bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) { +bool mgmtCheckConfigShow(SGlobalConfig *cfg) { if (cfg->cfgType & TSDB_CFG_CTYPE_B_CLUSTER) return false; if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) return false; return true; } - -bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp; - diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 2169d6731d661afaf31a7794c393c167018435ff..5a365f220a7098257ac7ea5e1273c17970671eaf 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -26,15 +26,21 @@ #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtDnodeInt.h" +#include "mgmtGrant.h" #include "mgmtProfile.h" #include "mgmtShell.h" #include "mgmtTable.h" #include "mgmtVgroup.h" +int32_t (*mgmtInitDnodeIntFp)() = NULL; +void (*mgmtCleanUpDnodeIntFp)() = NULL; + void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL; void *mgmtStatusTimer = NULL; +static void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); + static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { int32_t contLen = *(int32_t *) (sched->msg - 4); int32_t code = *(int32_t *) (sched->msg - 8); @@ -225,6 +231,14 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo } } +static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) { + if (mgmtUpdateGrantInfoFp) { + mgmtUpdateGrantInfoFp(pCont); + mTrace("grant info is updated"); + } + rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0); +} + void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { mError("invalid msg type:%d", msgType); @@ -249,6 +263,10 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { + } else if (msgType == TSDB_MSG_TYPE_STATUS) { + mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code); + } else if (msgType == TSDB_MSG_TYPE_GRANT) { + mgmtProcessDnodeGrantMsg(pCont, pConn); } else { mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); } @@ -256,8 +274,6 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p //rpcFreeCont(pCont); } - - void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle); } @@ -317,108 +333,113 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { } int32_t mgmtSendCfgDnodeMsg(char *cont) { -#ifdef CLUSTER - char * pMsg, *pStart; - int32_t msgLen = 0; -#endif - - SDnodeObj *pDnode; - SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont; - uint32_t ip; - - ip = inet_addr(pCfg->ip); - pDnode = mgmtGetDnode(ip); - if (pDnode == NULL) { - mError("dnode ip:%s not configured", pCfg->ip); - return TSDB_CODE_NOT_CONFIGURED; - } - - mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config); - int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config); - if (code != -1) { - return code; - } - -#ifdef CLUSTER - pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG); - if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE; - pMsg = pStart; - - memcpy(pMsg, cont, sizeof(SCfgDnodeMsg)); - pMsg += sizeof(SCfgDnodeMsg); - - msgLen = pMsg - pStart; - mgmtSendMsgToDnode(pDnode, pStart, msgLen); -#else - (void)tsCfgDynamicOptions(pCfg->config); -#endif - return 0; +//#ifdef CLUSTER +// char * pMsg, *pStart; +// int32_t msgLen = 0; +//#endif +// +// SDnodeObj *pDnode; +// SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont; +// uint32_t ip; +// +// ip = inet_addr(pCfg->ip); +// pDnode = mgmtGetDnode(ip); +// if (pDnode == NULL) { +// mError("dnode ip:%s not configured", pCfg->ip); +// return TSDB_CODE_NOT_CONFIGURED; +// } +// +// mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config); +// int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config); +// if (code != -1) { +// return code; +// } +// +//#ifdef CLUSTER +// pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG); +// if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE; +// pMsg = pStart; +// +// memcpy(pMsg, cont, sizeof(SCfgDnodeMsg)); +// pMsg += sizeof(SCfgDnodeMsg); +// +// msgLen = pMsg - pStart; +// mgmtSendMsgToDnode(pDnode, pStart, msgLen); +//#else +// (void)tsCfgDynamicOptions(pCfg->config); +//#endif +// return 0; } -int32_t mgmtInitDnodeIntImp() { return 0; } -int32_t (*mgmtInitDnodeInt)() = mgmtInitDnodeIntImp; - -void mgmtCleanUpDnodeIntImp() {} -void (*mgmtCleanUpDnodeInt)() = mgmtCleanUpDnodeIntImp; - -void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { -/* - SDnodeObj *pObj = &tsDnodeObj; - pObj->openVnodes = tsOpenVnodes; - pObj->status = TSDB_DN_STATUS_READY; +int32_t mgmtInitDnodeInt() { + if (mgmtInitDnodeIntFp) { + return mgmtInitDnodeIntFp(); + } else { + return 0; + } +} - float memoryUsedMB = 0; - taosGetSysMemory(&memoryUsedMB); - pObj->diskAvailable = tsAvailDataDirGB; - - for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { - SVnodeLoad *pVload = &(pObj->vload[vnode]); - SVnodeObj * pVnode = vnodeList + vnode; - - // wait vnode dropped - if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { - if (vnodeList[vnode].cfg.maxSessions <= 0) { - pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; - pVload->status = TSDB_VN_STATUS_OFFLINE; - mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode); - taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr); - } - } +void mgmtCleanUpDnodeInt() { + if (mgmtCleanUpDnodeIntFp) { + mgmtCleanUpDnodeIntFp(); + } +} - if (vnodeList[vnode].cfg.maxSessions <= 0) { - continue; - } +void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { + SStatusMsg *pStatus = (SStatusMsg *)pCont; - pVload->vnode = vnode; - pVload->status = TSDB_VN_STATUS_MASTER; - pVload->totalStorage = pVnode->vnodeStatistic.totalStorage; - pVload->compStorage = pVnode->vnodeStatistic.compStorage; - pVload->pointsWritten = pVnode->vnodeStatistic.pointsWritten; - uint32_t vgId = pVnode->cfg.vgId; - - SVgObj *pVgroup = mgmtGetVgroup(vgId); - if (pVgroup == NULL) { - mError("vgroup:%d is not there, but associated with vnode %d", vgId, vnode); - pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING; - continue; - } + SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp)); + if (pObj == NULL) { + mError("dnode:%s not exist", taosIpStr(pObj->privateIp)); + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_DNODE_NOT_EXIST, NULL, 0); + return; + } - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("vgroup:%d not belongs to any database, vnode:%d", vgId, vnode); - continue; - } + pObj->lastReboot = htonl(pStatus->lastReboot); + pObj->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); + pObj->openVnodes = htons(pStatus->openVnodes); + pObj->numOfCores = htons(pStatus->numOfCores); + pObj->diskAvailable = pStatus->diskAvailable; + pObj->alternativeRole = pStatus->alternativeRole; +// +// if (mgmtProcessDnodeStatusFp) { +// mgmtProcessDnodeStatusFp(pStatus, pObj, pConn); +// return; +// } - if (pVload->vgId == 0 || pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { - mError("vid:%d, mgmt not exist, drop it", vnode); - pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING; - } - } + pObj->status = TSDB_DN_STATUS_READY; - taosTmrReset(mgmtProcessDnodeStatus, tsStatusInterval * 1000, NULL, tsMgmtTmr, &mgmtStatusTimer); - if (mgmtStatusTimer == NULL) { - mError("Failed to start status timer"); - } -*/ +// // wait vnode dropped +// for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { +// SVnodeLoad *pVload = &(pObj->vload[vnode]); +// if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { +// bool existInDnode = false; +// for (int32_t j = 0; j < pObj->openVnodes; ++j) { +// if (htonl(pStatus->load[j].vnode) == vnode) { +// existInDnode = true; +// break; +// } +// } +// +// if (!existInDnode) { +// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; +// pVload->status = TSDB_VN_STATUS_OFFLINE; +// mgmtUpdateDnode(pObj); +// mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode); +// taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr); +// } +// } else if (pVload->vgId == 0) { +// /* +// * In some cases, vnode information may be reported abnormally, recover it +// */ +// if (pVload->dropStatus != TSDB_VN_DROP_STATUS_READY || pVload->status != TSDB_VN_STATUS_OFFLINE) { +// mPrint("dnode:%s, vid:%d, vgroup:%d status:%s dropStatus:%s, set it to avail status", +// taosIpStr(pObj->privateIp), vnode, pVload->vgId, taosGetVnodeStatusStr(pVload->status), +// taosGetVnodeDropStatusStr(pVload->dropStatus)); +// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; +// pVload->status = TSDB_VN_STATUS_OFFLINE; +// mgmtUpdateDnode(pObj); +// } +// } +// } } -void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp; diff --git a/src/mnode/src/mgmtGrant.c b/src/mnode/src/mgmtGrant.c index 37a0753c23eb77db07d054b6227f0457f2bcf289..c24fb82aa6e3ccfb40cbecfd79548a7b258edc8d 100644 --- a/src/mnode/src/mgmtGrant.c +++ b/src/mnode/src/mgmtGrant.c @@ -18,32 +18,79 @@ #include "mnode.h" #include "mgmtAcct.h" #include "mgmtGrant.h" +#include "mgmtUser.h" -int32_t mgmtCheckUserGrantImp() { return 0; } -int32_t (*mgmtCheckUserGrant)() = mgmtCheckUserGrantImp; +int32_t (*mgmtCheckUserGrantFp)() = NULL; +int32_t (*mgmtCheckDbGrantFp)() = NULL; +void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; +void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; +int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL; +bool (*mgmtCheckExpiredFp)() = NULL; +int32_t (*mgmtGetGrantsMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; +int32_t (*mgmtRetrieveGrantsFp)(SShowObj *pShow, char *data, int rows, void *pConn) = NULL; +void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL; -int32_t mgmtCheckDbGrantImp() { return 0; } -int32_t (*mgmtCheckDbGrant)() = mgmtCheckDbGrantImp; +int32_t mgmtCheckUserGrant() { + if (mgmtCheckUserGrantFp) { + return mgmtCheckUserGrantFp(); + } else { + return 0; + } +} + +int32_t mgmtCheckDbGrant() { + if (mgmtCheckDbGrantFp) { + return mgmtCheckDbGrantFp(); + } else { + return 0; + } +} -void mgmtAddTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { +void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; + if (mgmtAddTimeSeriesFp) { + mgmtAddTimeSeriesFp(timeSeriesNum); + } } -void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtAddTimeSeriesImp; -void mgmtRestoreTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { +void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; + if (mgmtRestoreTimeSeriesFp) { + mgmtRestoreTimeSeriesFp(timeSeriesNum); + } } -void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtRestoreTimeSeriesImp; - -int32_t mgmtCheckTimeSeriesImp(uint32_t timeseries) { return 0; } -int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries) = mgmtCheckTimeSeriesImp; -bool mgmtCheckExpiredImp() { return false; } -bool (*mgmtCheckExpired)() = mgmtCheckExpiredImp; +int32_t mgmtCheckTimeSeries(uint32_t timeseries) { + if (mgmtCheckTimeSeriesFp) { + return mgmtCheckTimeSeriesFp(timeseries); + } else { + return 0; + } +} -int32_t mgmtGetGrantsMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; } -int32_t (*mgmtGetGrantsMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetGrantsMetaImp; +bool mgmtCheckExpired() { + if (mgmtCheckExpiredFp) { + return mgmtCheckExpiredFp(); + } else { + return false; + } +} -int32_t mgmtRetrieveGrantsImp(SShowObj *pShow, char *data, int rows, void *pConn) { return 0; } -int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn) = mgmtRetrieveGrantsImp; +int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + if (mgmtGetGrantsMetaFp) { + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + return mgmtGetGrantsMetaFp(pMeta, pShow, pConn); + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } +} +int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + if (mgmtRetrieveGrantsFp) { + return mgmtRetrieveGrantsFp(pShow, data, rows, pConn); + } else { + return 0; + } +} diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index cb9e99135ae992eae8f062bf738264d2d1e7e330..1c60312f3eebb0bd0e8909f4e7b4aae003070fae 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -14,16 +14,134 @@ */ #define _DEFAULT_SOURCE +#include "tschemautil.h" #include "mgmtMnode.h" +#include "mgmtUser.h" -int32_t mgmtGetMnodeMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} +void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; +int32_t (*mgmtInitMnodesFp)() = NULL; +int32_t (*mgmtGetMnodesNumFp)() = NULL; + +static int32_t mgmtGetMnodesNum(); +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode); + +int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + int32_t cols = 0; + + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + SSchema *pSchema = tsGetSchema(pMeta); + + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "IP"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; -int32_t (*mgmtGetMnodeMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetMnodeMetaImp; + pShow->bytes[cols] = 10; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "status"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 10; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "role"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "public ip"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = mgmtGetMnodesNum(); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + pShow->pNode = NULL; -int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { return 0; } -int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveMnodesImp; +int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + int32_t cols = 0; + SSdbPeer *pMnode = NULL; + char *pWrite; + char ipstr[20]; + + while (numOfRows < rows) { + pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode); + + +// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode); +// if (pMnode == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, pMnode->ipstr); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pMnode->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, sdbStatusStr[(uint8_t)pMnode->status]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, sdbRoleStr[(uint8_t)pMnode->role]); + cols++; + + tinet_ntoa(ipstr, pMnode->publicIp); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, ipstr); + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static int32_t mgmtGetMnodesNum() { + if (mgmtGetMnodesNumFp) { + return mgmtGetMnodesNumFp(); + } else { + return 1; + } +} + +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { + if (mgmtGetNextMnodeFp) { + return mgmtGetNextMnodeFp(pShow, pMnode); + } else { + if (*pMnode == NULL) { + *pMnode = NULL; + } else { + *pMnode = NULL; + } + } + + return *pMnode; +} \ No newline at end of file diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 598ec600db73452549a880076eb286dce00d1390..2aadd8963c846fb00ae7624c131a0da10a63c4f0 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosmsg.h" +#include "taoserror.h" #include "tlog.h" #include "trpc.h" #include "tstatus.h" @@ -51,6 +52,7 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); +uint32_t mgmtAccessSquence = 0; void *tsShellConnServer = NULL; void mgmtProcessTranRequest(SSchedMsg *sched) { @@ -1047,38 +1049,6 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a // rpcFreeCont(pCont); } -void mgmtInitProcessShellMsg() { - mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DB] = mgmtProcessDropDbMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUnSupportMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_USER] = mgmtProcessAlterUserMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_USER] = mgmtProcessDropUserMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_ACCT] = mgmtProcessCreateAcctMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_ACCT] = mgmtProcessDropAcctMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_ACCT] = mgmtProcessAlterAcctMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_TABLE] = mgmtProcessCreateTableMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_TABLE] = mgmtProcessDropTableMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DNODE] = mgmtProcessCreateDnodeMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessUnSupportMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessTableMetaMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiTableMetaMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessSuperTableMetaMsg; -} - void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb == NULL) { @@ -1194,10 +1164,167 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); } -void (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; \ No newline at end of file + +static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtAlterAcctFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SAlterAcctMsg *pAlter = pCont; + pAlter->cfg.maxUsers = htonl(pAlter->cfg.maxUsers); + pAlter->cfg.maxDbs = htonl(pAlter->cfg.maxDbs); + pAlter->cfg.maxTimeSeries = htonl(pAlter->cfg.maxTimeSeries); + pAlter->cfg.maxConnections = htonl(pAlter->cfg.maxConnections); + pAlter->cfg.maxStreams = htonl(pAlter->cfg.maxStreams); + pAlter->cfg.maxPointsPerSecond = htonl(pAlter->cfg.maxPointsPerSecond); + pAlter->cfg.maxStorage = htobe64(pAlter->cfg.maxStorage); + pAlter->cfg.maxQueryTime = htobe64(pAlter->cfg.maxQueryTime); + pAlter->cfg.maxInbound = htobe64(pAlter->cfg.maxInbound); + pAlter->cfg.maxOutbound = htobe64(pAlter->cfg.maxOutbound); + + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("account:%s, failed to alter account, need redirect message", pAlter->user); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("account:%s, failed to alter account, invalid user", pAlter->user); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("account:%s, failed to alter account, no rights", pAlter->user); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = mgmtAlterAcctFp(pAlter->user, pAlter->pass, &(pAlter->cfg));; + if (code == TSDB_CODE_SUCCESS) { + mLPrint("account:%s is altered by %s", pAlter->user, pUser->user); + } else { + mError("account:%s, failed to alter account, reason:%s", pAlter->user, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + +static void mgmtProcessDropAcctMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtDropAcctFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SDropAcctMsg *pDrop = (SDropAcctMsg *) pCont; + + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("account:%s, failed to drop account, need redirect message", pDrop->user); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("account:%s, failed to drop account, invalid user", pDrop->user); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("account:%s, failed to drop account, no rights", pDrop->user); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = mgmtDropAcctFp(pDrop->user); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("account:%s is dropped by %s", pDrop->user, pUser->user); + } else { + mError("account:%s, failed to drop account, reason:%s", pDrop->user, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + +static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtCreateAcctFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SCreateAcctMsg *pCreate = (SCreateAcctMsg *) pCont; + pCreate->cfg.maxUsers = htonl(pCreate->cfg.maxUsers); + pCreate->cfg.maxDbs = htonl(pCreate->cfg.maxDbs); + pCreate->cfg.maxTimeSeries = htonl(pCreate->cfg.maxTimeSeries); + pCreate->cfg.maxConnections = htonl(pCreate->cfg.maxConnections); + pCreate->cfg.maxStreams = htonl(pCreate->cfg.maxStreams); + pCreate->cfg.maxPointsPerSecond = htonl(pCreate->cfg.maxPointsPerSecond); + pCreate->cfg.maxStorage = htobe64(pCreate->cfg.maxStorage); + pCreate->cfg.maxQueryTime = htobe64(pCreate->cfg.maxQueryTime); + pCreate->cfg.maxInbound = htobe64(pCreate->cfg.maxInbound); + pCreate->cfg.maxOutbound = htobe64(pCreate->cfg.maxOutbound); + + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("account:%s, failed to create account, need redirect message", pCreate->user); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("account:%s, failed to create account, invalid user", pCreate->user); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("account:%s, failed to create account, no rights", pCreate->user); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = mgmtCreateAcctFp(pCreate->user, pCreate->pass, &(pCreate->cfg)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("account:%s is created by %s", pCreate->user, pUser->user); + } else { + mError("account:%s, failed to create account, reason:%s", pCreate->user, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + +void mgmtInitProcessShellMsg() { + mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DB] = mgmtProcessDropDbMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUnSupportMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_USER] = mgmtProcessAlterUserMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_USER] = mgmtProcessDropUserMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_ACCT] = mgmtProcessCreateAcctMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_ACCT] = mgmtProcessDropAcctMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_ACCT] = mgmtProcessAlterAcctMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_TABLE] = mgmtProcessCreateTableMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_TABLE] = mgmtProcessDropTableMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DNODE] = mgmtProcessCreateDnodeMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessUnSupportMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessTableMetaMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiTableMetaMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessSuperTableMetaMsg; +} diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index 1eb114aa8974f914b37fe73284858d8c43707fe7..e36788a5ddb79896d9e89a958bdcba2999af784a 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" +#include "tmodule.h" #include "tsched.h" #include "mnode.h" #include "mgmtAcct.h" @@ -32,13 +33,11 @@ char tsMgmtDirectory[128] = {0}; void *tsMgmtTmr = NULL; void *tsMgmtTranQhandle = NULL; -void *tsMgmtStatisTimer = NULL; + void mgmtCleanUpSystem() { mPrint("starting to clean up mgmt"); - taosTmrStopA(&tsMgmtStatisTimer); - mgmtCleanUpRedirect(); sdbCleanUpPeers(); mgmtCleanupBalance(); mgmtCleanUpDnodeInt(); @@ -55,6 +54,20 @@ void mgmtCleanUpSystem() { mPrint("mgmt is cleaned up"); } +int32_t mgmtCheckMgmtRunning() { + if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) { + return -1; + } + + tsetModuleStatus(TSDB_MOD_MGMT); + +// strcpy(sdbMasterIp, mgmtIpStr[0]); +// strcpy(sdbPrivateIp, tsPrivateIp); +// sdbPublicIp = inet_addr(tsPublicIp); + + return 0; +} + int32_t mgmtStartSystem() { mPrint("starting to initialize TDengine mgmt ..."); @@ -111,10 +124,10 @@ int32_t mgmtStartSystem() { return -1; } - if (mgmtInitShell() < 0) { - mError("failed to init shell"); - return -1; - } +// if (mgmtInitShell() < 0) { +// mError("failed to init shell"); +// return -1; +// } if (sdbInitPeers(tsMgmtDirectory) < 0) { mError("failed to init peers"); @@ -125,39 +138,38 @@ int32_t mgmtStartSystem() { mError("failed to init dnode balance") } - taosTmrReset(mgmtDoStatistic, tsStatusInterval * 30000, NULL, tsMgmtTmr, &tsMgmtStatisTimer); mPrint("TDengine mgmt is initialized successfully"); return 0; } -int32_t mgmtInitSystemImp() { - int32_t code = mgmtStartSystem(); - if (code != 0) { - return code; - } +int32_t mgmtInitSystem() { + struct stat dirstat; + bool directoryExist = (stat(tsMgmtDirectory, &dirstat) == 0); + bool equalWithMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0); - taosTmrReset(mgmtProcessDnodeStatus, 500, NULL, tsMgmtTmr, &mgmtStatusTimer); - return code; -} + if (equalWithMaster || directoryExist) { + if (mgmtStartSystem() != 0) { + return -1; + } + } -int32_t (*mgmtInitSystem)() = mgmtInitSystemImp; + if (mgmtInitShell() < 0) { + mError("failed to init shell"); + return -1; + } -int32_t mgmtCheckMgmtRunningImp() { return 0; } -int32_t (*mgmtCheckMgmtRunning)() = mgmtCheckMgmtRunningImp; - -void mgmtDoStatisticImp(void *handle, void *tmrId) {} - -void (*mgmtDoStatistic)(void *handle, void *tmrId) = mgmtDoStatisticImp; - -void mgmtStopSystemImp() {} - -void (*mgmtStopSystem)() = mgmtStopSystemImp; - -void mgmtCleanUpRedirectImp() {} +void mgmtStopSystem() { + if (sdbMaster) { + mTrace("it is a master mgmt node, it could not be stopped"); + return; + } -void (*mgmtCleanUpRedirect)() = mgmtCleanUpRedirectImp; + mgmtCleanUpSystem(); + remove(tsMgmtDirectory); +// mgmtInitRedirect(); +} diff --git a/src/os/linux/src/tsystem.c b/src/os/linux/src/tsystem.c index 8cd0e6943616f4ecb1f69ed100ca6535968005ae..ea7b64980f6cf796af26a4a323c60bf041b9c39f 100644 --- a/src/os/linux/src/tsystem.c +++ b/src/os/linux/src/tsystem.c @@ -585,6 +585,7 @@ void tsPrintOsInfo() { pPrint(" os release: %s", buf.release); pPrint(" os version: %s", buf.version); pPrint(" os machine: %s", buf.machine); + pPrint("=================================="); } void taosKillSystem() { diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index 9bbcd60fc4d386b9afc1b52c82b9cbe645193f7a..8b5410a596ae64a27ebecc9e3591fa90de044323 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -48,7 +48,7 @@ typedef struct { char spi:3; // security parameter index char encrypt:3; // encrypt algorithm, 0: no encryption uint16_t tranId; // transcation ID - uint32_t uid; // for unique ID inside a client + uint32_t linkUid; // for unique connection ID assigned by client uint32_t sourceId; // source ID, an index for connection list uint32_t destId; // destination ID, an index for connection list uint32_t destIp; // destination IP address, for NAT scenario diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6ceb1f98bb8e524861c4a14633395e2e02853eb3..51961993a45779773014057831fefe1f3140056d 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -94,8 +94,9 @@ typedef struct _RpcConn { char encrypt; // encryption, 0:1 char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key + char secured; // if set to 1, no authentication uint16_t localPort; // for UDP only - uint32_t peerUid; // peer UID + uint32_t linkUid; // connection unique ID assigned by client uint32_t peerIp; // peer IP uint32_t destIp; // server destination IP to handle NAT uint16_t peerPort; // peer port @@ -264,7 +265,7 @@ void *rpcOpen(SRpcInit *pInit) { return NULL; } } else { - pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); + pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime); if ( pRpc->pCache == NULL ) { tError("%s failed to init connection cache", pRpc->label); rpcClose(pRpc); @@ -399,10 +400,9 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pHead->tranId = pConn->inTranId; pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; - pHead->uid = 0; + pHead->linkUid = pConn->linkUid; pHead->port = htons(pConn->localPort); pHead->code = htonl(code); - memcpy(pHead->user, pConn->user, tListLen(pHead->user)); // set pConn parameters pConn->inType = 0; @@ -417,6 +417,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { taosTmrStopA(&pConn->pTimer); rpcSendMsgToPeer(pConn, msg, msgLen); + pConn->secured = 1; // connection shall be secured return; } @@ -499,7 +500,7 @@ static void rpcCloseConn(void *thandle) { if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; - sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType); + sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); taosDeleteStrHash(pRpc->hash, hashstr); rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; @@ -535,6 +536,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { pConn->sid = sid; pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); + pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); pConn->spi = pRpc->spi; pConn->encrypt = pRpc->encrypt; if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); @@ -548,7 +550,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { char hashstr[40] = {0}; SRpcHead *pHead = (SRpcHead *)pRecv->msg; - sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType); + sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); // check if it is already allocated SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); @@ -567,6 +569,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->sid = sid; pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); + pConn->linkUid = pHead->linkUid; if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey) < 0) { tWarn("%s %p, user not there", pRpc->label, pConn); taosFreeId(pRpc->idPool, sid); // sid shall be released @@ -601,8 +604,8 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { } if (pConn) { - if (memcmp(pConn->user, pHead->user, tListLen(pConn->user)) != 0) { - tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, pHead->user); + if (pConn->linkUid != pHead->linkUid) { + tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid); terrno = TSDB_CODE_MISMATCHED_METER_ID; pConn = NULL; } @@ -748,7 +751,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if (pRecv->port) pConn->peerPort = pRecv->port; if (pHead->port) pConn->peerPort = htons(pHead->port); - if (pHead->uid) pConn->peerUid = pHead->uid; terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); @@ -813,7 +815,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); } - if (pConn && pRpc->idleTime) { + if (pRpc->connType == TAOS_CONN_SERVER && pConn && pRpc->idleTime) { + // only for server, starts the idle timer. For client, it is started by cache mgmt taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } @@ -881,7 +884,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead->tranId = pConn->inTranId; pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; - pHead->uid = 0; + pHead->linkUid = pConn->linkUid; memcpy(pHead->user, pConn->user, tListLen(pHead->user)); pHead->code = htonl(code); @@ -905,7 +908,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { pReplyHead->tranId = pRecvHead->tranId; pReplyHead->sourceId = pRecvHead->destId; pReplyHead->destId = pRecvHead->sourceId; - memcpy(pReplyHead->user, pRecvHead->user, tListLen(pReplyHead->user)); + pReplyHead->linkUid = pRecvHead->linkUid; pReplyHead->code = htonl(code); msgLen = sizeof(SRpcHead); @@ -951,8 +954,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pHead->destId = pConn->peerId; pHead->destIp = pConn->destIp; pHead->port = 0; - pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); - memcpy(pHead->user, pConn->user, tListLen(pHead->user)); + pHead->linkUid = pConn->linkUid; + if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user)); // set the connection parameters pConn->outType = msgType; @@ -1026,8 +1029,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { pConn->retry++; if (pConn->retry < 4) { - tTrace("%s %p, re-send msg:%s to %s:%hu retry:%d", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn->retry); + tTrace("%s %p, re-send msg:%s to %s:%hud", pRpc->label, pConn, + taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { @@ -1179,7 +1182,7 @@ static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) { static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; - if (pConn->spi) { + if (pConn->spi && pConn->secured == 0) { // add auth part pHead->spi = pConn->spi; SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); @@ -1188,6 +1191,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); } else { + pHead->spi = 0; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } @@ -1197,9 +1201,10 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; SRpcInfo *pRpc = pConn->pRpc; - int32_t code = 0; + int code = 0; - if (pConn->spi == 0) { + if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ + // secured link, or no authentication pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); return 0; } @@ -1214,7 +1219,6 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { } code = 0; - if (pHead->spi == pConn->spi) { // authentication SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest)); @@ -1231,6 +1235,8 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { code = TSDB_CODE_AUTH_FAILURE; } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); + if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client + tTrace("%s %p, message is authenticated", pRpc->label, pConn); } } } else { @@ -1243,7 +1249,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { static void rpcLockConn(SRpcConn *pConn) { int64_t tid = taosGetPthreadId(); - int i = 0; + int i = 0; while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) { if (++i % 1000 == 0) { sched_yield(); diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 63c23ce7bc755ef8e492d1b7ada6b49351dfbc2f..aa97535e314b41da4a33c8950855ca87c65ce40a 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -106,11 +106,12 @@ int main(int argc, char *argv[]) { rpcInit.cfp = processResponse; rpcInit.ufp = processUpdateIpSet; rpcInit.sessions = 100; - rpcInit.idleTime = 2000; + rpcInit.idleTime = tsShellActivityTimer*1000; rpcInit.user = "michael"; rpcInit.secret = "mypassword"; rpcInit.ckey = "key"; rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; for (int i=1; i