diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 91670d3e7a6f4d0accb19aca9613cce44005c7b6..34768b00dfa0da5fbb20e8e2c22fa0294e6864ba 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,11 +4,11 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(rpc) -# ADD_SUBDIRECTORY(client) -# ADD_SUBDIRECTORY(kit) -# ADD_SUBDIRECTORY(plugins) -# ADD_SUBDIRECTORY(sdb) -# ADD_SUBDIRECTORY(mnode) +ADD_SUBDIRECTORY(client) +ADD_SUBDIRECTORY(kit) +ADD_SUBDIRECTORY(plugins) +ADD_SUBDIRECTORY(sdb) +ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) -# ADD_SUBDIRECTORY(dnode) +ADD_SUBDIRECTORY(dnode) #ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 399b4b9920625a766f1bead360237673d539b6f0..7a67d7dbf21e952dced6e04e3f54adc451f7555d 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -26,15 +26,13 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeInitMgmtIp(); -void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); void dnodeSendVnodeCfgMsg(int32_t vnode); void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid); - - #ifdef __cplusplus } #endif diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d8273738c37ca20b738c0afdc635630ec153a38c..1e7af8d094e9895d37a4affc9d31473faf281dd2 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -30,12 +30,10 @@ void (*dnodeInitMgmtIpFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL; void (*dnodeCleanUpMgmtFp)() = NULL; - -void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; +void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; - static void *tsStatusTimer = NULL; static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); @@ -173,13 +171,13 @@ void dnodeCleanUpMgmt() { } } -void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); return; } - dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[msgType], pConn); + dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn); if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 96d0db5f6a0da50d0471d6c17e569c938e53d13d..dd4678802f8129d6b55cc01e12b8145394121691 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -68,38 +68,6 @@ void dnodeCleanUpModules() { } } -void dnodeProcessModuleStatus(uint32_t status) { - if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - return; - } - - int news = status; - int olds = tsModuleStatus; - - for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { - int newStatus = news & (1 << moduleType); - int oldStatus = olds & (1 << moduleType); - - if (oldStatus > 0) { - if (newStatus == 0) { - if (tsModule[moduleType].stopFp) { - dPrint("module:%s is stopped on this node", tsModule[moduleType].name); - (*tsModule[moduleType].stopFp)(); - } - } - } else if (oldStatus == 0) { - if (newStatus > 0) { - if (tsModule[moduleType].startFp) { - dPrint("module:%s is started on this node", tsModule[moduleType].name); - (*tsModule[moduleType].startFp)(); - } - } - } else { - } - } - tsModuleStatus = status; -} - int32_t dnodeInitModules() { for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) { if (tsModule[mod].num != 0 && tsModule[mod].initFp) { diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 67a1d42565dfac1316833208ee1c0e89bddc5293..d6127574c691c9dea8017a9f3f9d4f5b598f88fd 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -33,12 +33,14 @@ #include "dnodeVnodeMgmt.h" #ifdef CLUSTER -//#include "acct.h" -//#include "admin.h" -//#include "cluster.h" -//#include "grant.h" -//#include "replica.h" -//#include "storage.h" +#include "account.h" +#include "admin.h" +#include "balance.h" +#include "cluster.h" +#include "grant.h" +#include "mpeer.h" +#include "storage.h" +#include "vpeer.h" #endif static pthread_mutex_t tsDnodeMutex; @@ -89,8 +91,6 @@ void dnodeCleanUpSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); } - - dnodeCleanupShell(); dnodeCleanUpModules(); dnodeCleanupVnodes(); @@ -112,7 +112,13 @@ void dnodeCheckDataDirOpenned(const char *dir) { void dnodeInitPlugins() { #ifdef CLUSTER - acctInit(); +// acctInit(); +// adminInit(); +// balanceInit(); +// clusterInit(); +// grantInit(); +// mpeerInit(); +// storageInit(); #endif } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 15a6096826f2c0e96e6890956856ad783a592fae..ff893acd38a7ea8c80309d43c2499376155c2dda 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -49,8 +49,6 @@ extern int32_t (*dnodeCheckSystem)(); extern void *tsDnodeMgmtQhandle; void dnodeCheckDataDirOpenned(const char* dir); -void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); - // dnodeModule extern void (*dnodeStartModules)(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 34c8b8c77e4821ebb7a1aab1c4e148ffd3893302..12a3fefe66b757b7f42b9fffa901f236f9455c03 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -264,6 +264,7 @@ void mgmtCleanUpSystem(); void mgmtStopSystem(); void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); #ifdef __cplusplus } diff --git a/src/inc/sdb.h b/src/inc/sdb.h index d0239522a9d2939cdd09374a5ad97937949d2400..4b4de1ac4b9b2575493f289f98660dbbb6704911 100644 --- a/src/inc/sdb.h +++ b/src/inc/sdb.h @@ -25,18 +25,10 @@ extern "C" { extern uint16_t tsMgmtMgmtPort; extern uint16_t tsMgmtSyncPort; -extern int sdbMaxNodes; extern int tsMgmtPeerHBTimer; // seconds -extern char sdbZone[]; -extern char sdbMasterIp[]; -extern char sdbPrivateIp[]; extern char * sdbStatusStr[]; extern char * sdbRoleStr[]; -extern void * mnodeSdb; -extern int sdbExtConns; extern int sdbMaster; -extern uint32_t sdbPublicIp; -extern uint32_t sdbMasterStartTime; extern SRpcIpSet *pSdbIpList; extern SRpcIpSet *pSdbPublicIpList; @@ -89,14 +81,9 @@ typedef struct { // internal int syncFd; void *hbTimer; - void *thandle; void *pSync; } SSdbPeer; -SSdbPeer *sdbAddPeer(uint32_t ip, uint32_t publicIp, char role); - -void sdbUpdateIpList(); - extern SSdbPeer *sdbPeer[]; #define sdbInited (sdbPeer[0]) #define sdbStatus (sdbPeer[0]->status) @@ -130,8 +117,6 @@ int sdbInitPeers(char *directory); void sdbCleanUpPeers(); -int sdbCfgNode(char *cont); - int64_t sdbGetVersion(); int32_t sdbGetRunStatus(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a05bd20f84786f6e5f19e137b41ed559547d36f2..bf031870101f282007b30a5f390bef3e0bf1fe45 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -595,23 +595,11 @@ typedef struct { typedef struct { int32_t code; - int32_t numOfVnodes; SDnodeState dnodeState; SRpcIpSet ipList; SVnodeAccess vnodeAccess[]; } SStatusRsp; -// internal message -typedef struct { - uint32_t destId; - uint32_t destIp; - char tableId[TSDB_UNI_LEN + 1]; - char empty[3]; - uint8_t msgType; - int32_t msgLen; - uint8_t content[0]; -} SIntMsg; - typedef struct { char spi; char encrypt; diff --git a/src/mnode/CMakeLists.txt b/src/mnode/CMakeLists.txt index 6bf4ef34e062b8559a48daf0383e2149a7064a92..acf8b15f2263a35de39ebc9077c0e9372d985948 100644 --- a/src/mnode/CMakeLists.txt +++ b/src/mnode/CMakeLists.txt @@ -14,7 +14,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread) IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(mnode acct) + TARGET_LINK_LIBRARIES(mnode) ENDIF () ENDIF () diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 61331b9b3d3c8d90cfe48550511e44e07a8fbd5e..7a6bb3a9aa3d8d66d636dc069d575a9739e28e8c 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -22,7 +22,6 @@ extern "C" { #include "mnode.h" -void mgmtStartBalanceTimer(int64_t mseconds); int32_t mgmtInitBalance(); void mgmtCleanupBalance(); int32_t mgmtAllocVnodes(SVgObj *pVgroup); diff --git a/src/mnode/inc/mgmtConn.h b/src/mnode/inc/mgmtConn.h deleted file mode 100644 index 62dd5ebb42cf585f4aacc6902c9276b280ab932d..0000000000000000000000000000000000000000 --- a/src/mnode/inc/mgmtConn.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MGMT_CONN_H -#define TDENGINE_MGMT_CONN_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "mnode.h" - -int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); -int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn); - -bool mgmtCheckQhandle(uint64_t qhandle); -void mgmtSaveQhandle(void *qhandle); -void mgmtFreeQhandle(void *qhandle); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 4cdac1e7afd367fedcab36a07f6e36d26a215551..6532c98612c682e79f2ebad2e68a8d4f843f2cc5 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -50,9 +50,16 @@ int32_t mgmtGetDnodesNum(); int32_t mgmtUpdateDnode(SDnodeObj *pDnode); void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode); bool mgmtCheckConfigShow(SGlobalConfig *cfg); +bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode); +bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode); void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); SDnodeObj* mgmtGetDnode(uint32_t ip); +extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip); +extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip); + +void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtGrant.h b/src/mnode/inc/mgmtGrant.h index 1cfc88f94af206c4291e4cf2be7551cffbe77244..01a5068bada043992a43b2c1d0ce0e18ac50f437 100644 --- a/src/mnode/inc/mgmtGrant.h +++ b/src/mnode/inc/mgmtGrant.h @@ -30,6 +30,7 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries); int32_t mgmtCheckTimeSeries(uint32_t timeseries); int32_t mgmtCheckUserGrant(); int32_t mgmtCheckDbGrant(); +int32_t mgmtCheckDnodeGrant(); int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn); diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index d012997d136903d9ae232756a87b38fb54d33010..6e8e91ebc85d4cd84ecb99bc4c87080dc4b06544 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -24,7 +24,10 @@ extern "C" { #include #include "mnode.h" - int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp); +int32_t mgmtRemoveMnode(uint32_t privateIp); + +int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 959f9e65ab93e220567384461de4bc93889be041..5af38a73b8b67a200b7acfb9e9d7b91f877cf05a 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -22,20 +22,22 @@ extern "C" { #include "mnode.h" -int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); - -int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +bool mgmtCheckQhandle(uint64_t qhandle); +void mgmtSaveQhandle(void *qhandle); +void mgmtFreeQhandle(void *qhandle); +int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); +int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); +int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); -int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); +int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtKillQuery(char *qidstr, void *pConn); - int32_t mgmtKillStream(char *qidstr, void *pConn); - int32_t mgmtKillConnection(char *qidstr, void *pConn); enum { diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index f14871b5b205e055724009179c6cb7b5c45053bc..06b0068652f68060bc661ab148f6cadb804a7c9c 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -28,10 +28,8 @@ int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); -extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); /* * If table not exist, will create it diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 1e5fc54c5a82102b5a0dd8db840be8d1ccc0bc63..8bd9cf893393255d0259795bcc89c41b50d7d087 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -77,11 +77,3 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { return 0; } } - -char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { - if (mgmtGetVnodeStatusFp) { - return (*mgmtGetVnodeStatusFp)(pVgroup, pVnode); - } else { - return "master"; - } -} diff --git a/src/mnode/src/mgmtConn.c b/src/mnode/src/mgmtConn.c deleted file mode 100644 index 5d7b8ab27f5da1a7c6af298301094d7812513391..0000000000000000000000000000000000000000 --- a/src/mnode/src/mgmtConn.c +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "mgmtConn.h" -#include "taosmsg.h" -#include "tschemautil.h" - -typedef struct { - char user[TSDB_TABLE_ID_LEN]; - uint64_t stime; - uint32_t ip; - uint16_t port; -} SConnInfo; - -typedef struct { - int numOfConns; - int index; - SConnInfo connInfo[]; -} SConnShow; - -int mgmtGetConns(SShowObj *pShow, void *pConn) { -// SAcctObj * pAcct = pConn->pAcct; -// SConnShow *pConnShow; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); -// pConnShow->index = 0; -// pConnShow->numOfConns = 0; -// -// if (pAcct->acctInfo.numOfConns > 0) { -// pConn = pAcct->pConn; -// SConnInfo *pConnInfo = pConnShow->connInfo; -// -// while (pConn && pConn->pUser) { -// strcpy(pConnInfo->user, pConn->pUser->user); -// pConnInfo->ip = pConn->ip; -// pConnInfo->port = pConn->port; -// pConnInfo->stime = pConn->stime; -// -// pConnShow->numOfConns++; -// pConnInfo++; -// pConn = pConn->next; -// } -// } -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// // sorting based on useconds -// -// pShow->pNode = pConnShow; - - return 0; -} - -int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - int cols = 0; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; - SSchema *pSchema = tsGetSchema(pMeta); - - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip:port"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "login time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - - pShow->numOfRows = 1000000; - pShow->pNode = NULL; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - - mgmtGetConns(pShow, pConn); - return 0; -} - -int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) { - int numOfRows = 0; - char *pWrite; - int cols = 0; - - SConnShow *pConnShow = (SConnShow *)pShow->pNode; - - if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; - - while (numOfRows < rows) { - SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->user); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pNode->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; - cols++; - - numOfRows++; - pConnShow->index++; - } - - if (numOfRows == 0) { - tfree(pConnShow); - } - - pShow->numOfReads += numOfRows; - return numOfRows; -} - -bool mgmtCheckQhandle(uint64_t qhandle) { - return true; -} - -void mgmtSaveQhandle(void *qhandle) { -} - -void mgmtFreeQhandle(void *qhandle) { -} \ No newline at end of file diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index bedd51dcffa23cc791b301acc708778b7db60490..0186f51e16e001794c315c1026be24cc13f76512 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { // // SVgObj *pVgroup = pDb->pHead; // while (pVgroup != NULL) { -// mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); +// balanceUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); // if (oldReplicaNum < pDb->cfg.replications) { -// if (!mgmtAddVnode(pVgroup, NULL, NULL)) { +// if (!balanceAddVnode(pVgroup, NULL, NULL)) { // mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); // code = TSDB_CODE_NO_ENOUGH_DNODES; // } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 20043c380081f86fbc025f2ce849cb4d9157c0da..a9c784eca9573584f0aa5ec9df30d8da09016711 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -33,6 +33,9 @@ void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL; int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL; +int32_t (*mgmtCreateDnodeFp)(uint32_t ip) = NULL; +int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip) = NULL; + static SDnodeObj tsDnodeObj = {0}; @@ -606,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { if (mgmtGetScoresMetaFp) { + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; return mgmtGetScoresMetaFp(pMeta, pShow, pConn); } else { return TSDB_CODE_OPS_NOT_SUPPORT; @@ -633,3 +639,17 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg) { return false; return true; } + +/** + * check if a dnode in remove state + **/ +bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode) { + return pDnode->lbStatus == TSDB_DN_LB_STATUS_OFFLINE_REMOVING || pDnode->lbStatus == TSDB_DN_LB_STATE_SHELL_REMOVING; +} + +/** + * check if a dnode in offline state + **/ +bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) { + return pDnode->status == TSDB_DN_STATUS_OFFLINE; +} diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 5a365f220a7098257ac7ea5e1273c17970671eaf..100e76b10bcb04a3b84bacc69b27ccb81cb506ca 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); pDnode->customScore = score; mgmtUpdateDnode(pDnode); - mgmtStartBalanceTimer(15); + //mgmtStartBalanceTimer(15); } return TSDB_CODE_INVALID_SQL; } else if (strncasecmp(option, "bandwidth", 9) == 0) { diff --git a/src/mnode/src/mgmtGrant.c b/src/mnode/src/mgmtGrant.c index c24fb82aa6e3ccfb40cbecfd79548a7b258edc8d..0ea212b86f8438ec9fe5c0adb1b8ea6f04c79499 100644 --- a/src/mnode/src/mgmtGrant.c +++ b/src/mnode/src/mgmtGrant.c @@ -22,6 +22,7 @@ int32_t (*mgmtCheckUserGrantFp)() = NULL; int32_t (*mgmtCheckDbGrantFp)() = NULL; +int32_t (*mgmtCheckDnodeGrantFp)() = NULL; void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL; @@ -32,7 +33,7 @@ void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL; int32_t mgmtCheckUserGrant() { if (mgmtCheckUserGrantFp) { - return mgmtCheckUserGrantFp(); + return (*mgmtCheckUserGrantFp)(); } else { return 0; } @@ -40,7 +41,15 @@ int32_t mgmtCheckUserGrant() { int32_t mgmtCheckDbGrant() { if (mgmtCheckDbGrantFp) { - return mgmtCheckDbGrantFp(); + return (*mgmtCheckDbGrantFp)(); + } else { + return 0; + } +} + +int32_t mgmtCheckDnodeGrant() { + if (mgmtCheckDnodeGrantFp) { + return (*mgmtCheckDnodeGrantFp)(); } else { return 0; } @@ -49,20 +58,20 @@ int32_t mgmtCheckDbGrant() { void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; if (mgmtAddTimeSeriesFp) { - mgmtAddTimeSeriesFp(timeSeriesNum); + (*mgmtAddTimeSeriesFp)(timeSeriesNum); } } void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; if (mgmtRestoreTimeSeriesFp) { - mgmtRestoreTimeSeriesFp(timeSeriesNum); + (*mgmtRestoreTimeSeriesFp)(timeSeriesNum); } } int32_t mgmtCheckTimeSeries(uint32_t timeseries) { if (mgmtCheckTimeSeriesFp) { - return mgmtCheckTimeSeriesFp(timeseries); + return (*mgmtCheckTimeSeriesFp)(timeseries); } else { return 0; } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 1c60312f3eebb0bd0e8909f4e7b4aae003070fae..b3b70d1d2e7dfa308d8bde2c706821824d9207bc 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -18,12 +18,48 @@ #include "mgmtMnode.h" #include "mgmtUser.h" -void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; -int32_t (*mgmtInitMnodesFp)() = NULL; +int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL; +int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL; int32_t (*mgmtGetMnodesNumFp)() = NULL; +void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; -static int32_t mgmtGetMnodesNum(); -static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode); +int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp) { + if (mgmtAddMnodeFp) { + return (*mgmtAddMnodeFp)(privateIp, publicIp); + } else { + return 0; + } +} + +int32_t mgmtRemoveMnode(uint32_t privateIp) { + if (mgmtRemoveMnodeFp) { + return (*mgmtRemoveMnodeFp)(privateIp); + } else { + return 0; + } +} + +static int32_t mgmtGetMnodesNum() { + if (mgmtGetMnodesNumFp) { + return (*mgmtGetMnodesNumFp)(); + } else { + return 1; + } +} + +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { + if (mgmtGetNextMnodeFp) { + return (*mgmtGetNextMnodeFp)(pShow, pMnode); + } else { + if (*pMnode == NULL) { + *pMnode = NULL; + } else { + *pMnode = NULL; + } + } + + return *pMnode; +} int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; @@ -88,11 +124,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon char ipstr[20]; while (numOfRows < rows) { - pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode); - - -// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode); -// if (pMnode == NULL) break; + pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode); + if (pMnode == NULL) break; cols = 0; @@ -123,25 +156,3 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon pShow->numOfReads += numOfRows; return numOfRows; } - -static int32_t mgmtGetMnodesNum() { - if (mgmtGetMnodesNumFp) { - return mgmtGetMnodesNumFp(); - } else { - return 1; - } -} - -static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { - if (mgmtGetNextMnodeFp) { - return mgmtGetNextMnodeFp(pShow, pMnode); - } else { - if (*pMnode == NULL) { - *pMnode = NULL; - } else { - *pMnode = NULL; - } - } - - return *pMnode; -} \ No newline at end of file diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 6e9a6e9c07f6ce2b130aba3116fa2f6c8e01a311..d8cc5af06b4cbbf9546d227314bd18c8edba4745 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -15,16 +15,27 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mnode.h" -#include "mgmtProfile.h" #include "taosmsg.h" #include "tschemautil.h" +#include "mgmtProfile.h" + +typedef struct { + char user[TSDB_TABLE_ID_LEN + 1]; + uint64_t stime; + uint32_t ip; + uint16_t port; +} SConnInfo; + +typedef struct { + int numOfConns; + int index; + SConnInfo connInfo[]; +} SConnShow; typedef struct { uint32_t ip; uint16_t port; - char user[TSDB_TABLE_ID_LEN]; + char user[TSDB_TABLE_ID_LEN+ 1]; } SCDesc; typedef struct { @@ -532,3 +543,123 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) { return TSDB_CODE_INVALID_CONNECTION; } + +bool mgmtCheckQhandle(uint64_t qhandle) { + return true; +} + +void mgmtSaveQhandle(void *qhandle) { +} + +void mgmtFreeQhandle(void *qhandle) { +} + +int mgmtGetConns(SShowObj *pShow, void *pConn) { + // SAcctObj * pAcct = pConn->pAcct; + // SConnShow *pConnShow; + // + // pthread_mutex_lock(&pAcct->mutex); + // + // pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); + // pConnShow->index = 0; + // pConnShow->numOfConns = 0; + // + // if (pAcct->acctInfo.numOfConns > 0) { + // pConn = pAcct->pConn; + // SConnInfo *pConnInfo = pConnShow->connInfo; + // + // while (pConn && pConn->pUser) { + // strcpy(pConnInfo->user, pConn->pUser->user); + // pConnInfo->ip = pConn->ip; + // pConnInfo->port = pConn->port; + // pConnInfo->stime = pConn->stime; + // + // pConnShow->numOfConns++; + // pConnInfo++; + // pConn = pConn->next; + // } + // } + // + // pthread_mutex_unlock(&pAcct->mutex); + // + // // sorting based on useconds + // + // pShow->pNode = pConnShow; + + return 0; +} + +int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + int32_t cols = 0; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + SSchema *pSchema = tsGetSchema(pMeta); + + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "login time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + + pShow->numOfRows = 1000000; + pShow->pNode = NULL; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + mgmtGetConns(pShow, pConn); + return 0; +} + +int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + char *pWrite; + int32_t cols = 0; + + SConnShow *pConnShow = (SConnShow *)pShow->pNode; + + if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; + + while (numOfRows < rows) { + SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, pNode->user); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + uint32_t ip = pNode->ip; + sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pNode->stime; + cols++; + + numOfRows++; + pConnShow->index++; + } + + if (numOfRows == 0) { + tfree(pConnShow); + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 2aadd8963c846fb00ae7624c131a0da10a63c4f0..a889c16f756b6090c67255e06d46af9c25b822d9 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -26,7 +26,6 @@ #include "mgmtAcct.h" #include "mgmtBalance.h" #include "mgmtChildTable.h" -#include "mgmtConn.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtDnodeInt.h" @@ -52,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -uint32_t mgmtAccessSquence = 0; void *tsShellConnServer = NULL; void mgmtProcessTranRequest(SSchedMsg *sched) { @@ -1164,10 +1162,8 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); } -void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) { if (!mgmtAlterAcctFp) { @@ -1297,6 +1293,76 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle rpcSendResponse(ahandle, code, NULL, 0); } +static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtCreateDnodeFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCont; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("failed to create dnode:%s, redirect this message", pCreate->ip); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_INVALID_USER)); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = (*mgmtCreateDnodeFp)(inet_addr(pCreate->ip)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%s is created by %s", pCreate->ip, pUser->user); + } else { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + +static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtDropDnodeByIpFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCont; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("failed to drop dnode:%s, redirect this message", pDrop->ip); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_INVALID_USER)); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = (*mgmtDropDnodeByIpFp)(inet_addr(pDrop->ip)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%s set to removing state by %s", pDrop->ip, pUser->user); + } else { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index e36788a5ddb79896d9e89a958bdcba2999af784a..bcb5f64a7b626e322038d809073338e0f0f091ad 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() { tsetModuleStatus(TSDB_MOD_MGMT); -// strcpy(sdbMasterIp, mgmtIpStr[0]); -// strcpy(sdbPrivateIp, tsPrivateIp); -// sdbPublicIp = inet_addr(tsPublicIp); - return 0; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 10b4244bf861fe06235048a92f949791ef440b11..b0ff80a8193c68a99a35a639348df54c12962e35 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return 0; } +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { + SDnodeObj *pDnode = mgmtGetDnode(pVnode->ip); + if (pDnode == NULL) { + mError("dnode:%s, vgroup:%d, vnode:%d dnode not exist", taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode); + return "null"; + } + + if (pDnode->status == TSDB_DN_STATUS_OFFLINE) { + return "offline"; + } + + SVnodeLoad *vload = pDnode->vload + pVnode->vnode; + if (vload->vgId != pVgroup->vgId || vload->vnode != pVnode->vnode) { + mError("dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d", + taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode, vload->vgId, vload->vnode); + return "null"; + } + + return (char*)taosGetVnodeStatusStr(vload->status); +} + int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SVgObj *pVgroup = NULL; diff --git a/src/plugins/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt index 9fd299995711f8ed98052db946e5d00bda714deb..7044f5d09d0f1fe2930e4d998af0d970d76f4655 100644 --- a/src/plugins/http/CMakeLists.txt +++ b/src/plugins/http/CMakeLists.txt @@ -13,6 +13,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(http taos_static z) IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(http http_admin) + TARGET_LINK_LIBRARIES(http) ENDIF () ENDIF () diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 41b712048f301bbcbd43ae89dff3c5c0198d5398..aa66af9825e002c341ff719594b0d1c42efef877 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -34,13 +34,21 @@ #include "tgHandle.h" #include "tlog.h" -#ifdef CLUSTER - void adminInitHandle(HttpServer* pServer); - void opInitHandle(HttpServer* pServer); -#else - void adminInitHandle(HttpServer* pServer) {} - void opInitHandle(HttpServer* pServer) {} -#endif + +void (*adminInitHandleFp)(HttpServer* pServer) = NULL; +void (*opInitHandleFp)(HttpServer* pServer) = NULL; + +void adminInitHandle(HttpServer* pServer) { + if (adminInitHandleFp) { + (*adminInitHandleFp)(pServer); + } +} + +void opInitHandle(HttpServer* pServer) { + if (opInitHandleFp) { + (*opInitHandleFp)(pServer); + } +} static HttpServer *httpServer = NULL; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); diff --git a/src/sdb/CMakeLists.txt b/src/sdb/CMakeLists.txt index b0617353d9e54c40257eb29b68eb0c1ac16a32d3..47ea6e15b8cf5d8ba6f5d24c562e2b133f58f5d0 100644 --- a/src/sdb/CMakeLists.txt +++ b/src/sdb/CMakeLists.txt @@ -11,6 +11,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ADD_LIBRARY(sdb ${SRC}) TARGET_LINK_LIBRARIES(sdb trpc) IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(sdb mreplica) + TARGET_LINK_LIBRARIES(sdb) ENDIF() ENDIF () diff --git a/src/sdb/inc/sdbint.h b/src/sdb/inc/sdbint.h index d0977f2e2fc18d72bfed79d8e47e9d3faf85bd34..30ebe0909c83f5fdefd6105e31f0ecd467d1b77d 100644 --- a/src/sdb/inc/sdbint.h +++ b/src/sdb/inc/sdbint.h @@ -48,6 +48,21 @@ #define sdbPrint(...) \ { tprintf("MND-SDB ", 255, __VA_ARGS__); } +#define mpeerError(...) \ + if (sdbDebugFlag & DEBUG_ERROR) { \ + tprintf("ERROR MND-MPEER ", 255, __VA_ARGS__); \ + } +#define mpeerWarn(...) \ + if (sdbDebugFlag & DEBUG_WARN) { \ + tprintf("WARN MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \ + } +#define mpeerTrace(...) \ + if (sdbDebugFlag & DEBUG_TRACE) { \ + tprintf("MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \ + } +#define mpeerPrint(...) \ + { tprintf("MND-MPEER ", 255, __VA_ARGS__); } + #define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__) #define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__) #define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__) @@ -69,12 +84,7 @@ typedef struct { char *row; } SSdbUpdate; -typedef struct { - char numOfTables; - uint64_t version[]; -} SSdbSync; - -typedef struct { +typedef struct _SSdbTable { SSdbHeader header; int maxRows; int dbId; @@ -109,23 +119,6 @@ typedef struct { char data[]; } SRowHead; -typedef struct { - char * buffer; - char * offset; - int trans; - int bufferSize; - pthread_mutex_t qmutex; -} STranQueue; - -typedef struct { - char status; - char role; - char numOfMnodes; - uint64_t dbVersion; - uint32_t numOfDnodes; - uint32_t publicIp; -} SMnodeStatus; - typedef struct { uint8_t dbId; char type; @@ -140,7 +133,7 @@ extern int sdbNumOfTables; extern int64_t sdbVersion; int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); -int sdbRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); +int mpeerRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); void sdbResetTable(SSdbTable *pTable); extern const int16_t sdbFileVersion; diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index 4b000a30ebd5406809dbe7d37575236d7af08162..fbc41089d1c9917aa5adae26b0052450a44e844d 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -23,7 +23,6 @@ extern char version[]; const int16_t sdbFileVersion = 0; -int sdbExtConns = 0; SRpcIpSet *pSdbIpList = NULL; SRpcIpSet *pSdbPublicIpList = NULL; SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self diff --git a/src/sdb/src/sdbstr.c b/src/sdb/src/sdbstr.c index 90bf2a3d43cc8d998111480efc3f5061d58b7516..59c01eb15a95a7354b5f37acb1d521a5e89c100d 100644 --- a/src/sdb/src/sdbstr.c +++ b/src/sdb/src/sdbstr.c @@ -12,32 +12,47 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - +#define _DEFAULT_SOURCE #include "sdbint.h" -char* sdbStatusStr[] = {"offline", "unsynced", "syncing", "serving", "null"}; - -char* sdbRoleStr[] = {"unauthed", "undecided", "master", "slave", "null"}; - -#ifndef CLUSTER - -/* - * Lite Version sync request is always successful - */ -int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { - return 0; +int32_t (*mpeerInitMnodesFp)(char *directory) = NULL; +void (*mpeerCleanUpMnodesFp)() = NULL; +int32_t (*mpeerForwardRequestFp)(SSdbTable *pTable, char type, void *cont, int32_t contLen) = NULL; + +char *sdbStatusStr[] = { + "offline", + "unsynced", + "syncing", + "serving", + "null" +}; + +char *sdbRoleStr[] = { + "unauthed", + "undecided", + "master", + "slave", + "null" +}; + +int32_t sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int32_t dataLen) { + if (mpeerForwardRequestFp) { + return mpeerForwardRequestFp(pTable, type, data, dataLen); + } else { + return 0; + } } -/* - * Lite Version does not need to initialize peers - */ -int sdbInitPeers(char *directory) { - return 0; +int32_t sdbInitPeers(char *directory) { + if (mpeerInitMnodesFp) { + return (*mpeerInitMnodesFp)(directory); + } else { + return 0; + } } -/* - * Lite Version does not need to cleanup peers - */ -void sdbCleanUpPeers(){} - -#endif \ No newline at end of file +void sdbCleanUpPeers() { + if (mpeerCleanUpMnodesFp) { + (*mpeerCleanUpMnodesFp)(); + } +}