diff --git a/src/dnode/inc/dnodeSystem.h b/src/dnode/inc/dnodeSystem.h index 15dcdbb39dcf80e2d9b312dea55050711e18cb39..d2c29845997b591b53b70eb31b3793e3a72dd7a1 100644 --- a/src/dnode/inc/dnodeSystem.h +++ b/src/dnode/inc/dnodeSystem.h @@ -38,6 +38,7 @@ extern int32_t tsMaxQueues; extern void ** tsRpcQhandle; extern void *tsQueryQhandle; extern void *tsDnodeMgmtQhandle; +extern void *tsDnodeTmr; int32_t dnodeInitSystem(); void dnodeCleanUpSystem(); diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h index 504439fc7ea6da0957860c059bc33e90949cdee5..a60d74425b673a6a6fb426f04d57627a1b68c484 100644 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ b/src/dnode/inc/dnodeVnodeMgmt.h @@ -58,6 +58,8 @@ int32_t dnodeDropVnode(int32_t vnode); //tsdb_repo_t* dnodeGetVnode(int vid); void* dnodeGetVnode(int32_t vnode); +int32_t dnodeGetVnodesNum(); + /* * get the status of vnode */ diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index e1e7df07af2cb15dcd37d12e1ab448c6cc9e6851..c5934bcdc4b06f8eea97bdb2f79b56753f8b8df7 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -29,10 +29,14 @@ void (*dnodeInitMgmtIpFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL; +void (*dnodeCleanUpMgmtFp)() = NULL; + void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; + +static void *tsStatusTimer = NULL; static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); @@ -86,12 +90,71 @@ void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, } } +void dnodeSendStatusMsgToMgmt(void *handle, void *tmrId) { + taosTmrReset(dnodeSendStatusMsgToMgmt, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + if (tsStatusTimer == NULL) { + dError("Failed to start status timer"); + return; + } + + int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); + SStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + dError("Failed to malloc status message"); + return; + } + + int32_t totalVnodes = dnodeGetVnodesNum(); + + pStatus->version = htonl(tsVersion); + pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); + pStatus->publicIp = htonl(inet_addr(tsPublicIp)); + pStatus->lastReboot = htonl(tsRebootTime); + pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); + pStatus->openVnodes = htons((uint16_t) totalVnodes); + pStatus->numOfCores = htons((uint16_t) tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + pStatus->alternativeRole = (uint8_t) tsAlternativeRole; + + SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load; + + //TODO loop all vnodes +// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) { +// if (vnodeList[vnode].cfg.maxSessions <= 0) continue; +// +// SVnodeObj *pVnode = vnodeList + vnode; +// pLoad->vnode = htonl(vnode); +// pLoad->vgId = htonl(pVnode->cfg.vgId); +// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus; +// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus; +// pLoad->accessState = (uint8_t)(pVnode->accessState); +// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage); +// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage); +// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) { +// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten); +// } else { +// pLoad->pointsWritten = htobe64(0); +// } +// pLoad++; +// +// if (++count >= tsOpenVnodes) { +// break; +// } +// } + + dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen); + + //grantSendMsgToMgmt(); +} + + int32_t dnodeInitMgmt() { if (dnodeInitMgmtFp) { dnodeInitMgmtFp(); } dnodeInitProcessShellMsg(); + taosTmrReset(dnodeSendStatusMsgToMgmt, 500, NULL, tsDnodeTmr, &tsStatusTimer); return 0; } @@ -101,6 +164,17 @@ void dnodeInitMgmtIp() { } } +void dnodeCleanUpMgmt() { + if (tsStatusTimer != NULL) { + taosTmrStopA(&tsStatusTimer); + tsStatusTimer = NULL; + } + + if (dnodeCleanUpMgmtFp) { + dnodeCleanUpMgmtFp(); + } +} + void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 71e8f47e1249ba71425765f9184e13514cba753b..23962ec075b4d2fe1e7833ec4387d0af8fde7550 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -33,12 +33,12 @@ #include "dnodeVnodeMgmt.h" #ifdef CLUSTER -#include "acct.h" -#include "admin.h" -#include "cluster.h" -#include "grant.h" -#include "replica.h" -#include "storage.h" +//#include "acct.h" +//#include "admin.h" +//#include "cluster.h" +//#include "grant.h" +//#include "replica.h" +//#include "storage.h" #endif static pthread_mutex_t tsDnodeMutex; @@ -48,8 +48,7 @@ static int32_t dnodeInitRpcQHandle(); static int32_t dnodeInitQueryQHandle(); static int32_t dnodeInitTmrCtl(); -void *tsStatusTimer = NULL; -void *vnodeTmrCtrl; +void *tsDnodeTmr; void **tsRpcQhandle; void *tsDnodeMgmtQhandle; void *tsQueryQhandle; @@ -90,10 +89,7 @@ void dnodeCleanUpSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); } - if (tsStatusTimer != NULL) { - taosTmrStopA(&tsStatusTimer); - tsStatusTimer = NULL; - } + dnodeCleanupShell(); dnodeCleanUpModules(); @@ -259,15 +255,15 @@ static int32_t dnodeInitQueryQHandle() { int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode; dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads); - tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); + tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", tsDnodeTmr); return 0; } static int32_t dnodeInitTmrCtl() { - vnodeTmrCtrl = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, + tsDnodeTmr = taosTmrInit(TSDB_MAX_VNODES * (tsVnodePeers + 10) + tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode"); - if (vnodeTmrCtrl == NULL) { + if (tsDnodeTmr == NULL) { dError("failed to init timer, exit"); return -1; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 45ff014dc51fa92d1abfb335751d3c44f61246a0..c69e1d2b26534654c390c2e41fa2d7cd987b004b 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -48,17 +48,14 @@ extern int32_t (*dnodeCheckSystem)(); // dnodeSystem extern void *tsDnodeMgmtQhandle; +void dnodeCheckDataDirOpenned(const char* dir); void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); // dnodeModule extern void (*dnodeStartModules)(); -// multilevelStorage -extern int32_t (*dnodeInitStorage)(); -extern void (*dnodeCleanupStorage)(); -void dnodeCheckDataDirOpenned(const char* dir); void dnodeLockVnodes(); void dnodeUnLockVnodes(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index ca6294f623f6a6bdc5e6981068cd0690d0aef153..7de97a21f88490bafc21707608d43a2d543b06a9 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -260,12 +260,10 @@ typedef struct { //mgmtSystem int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); -void mgmtProcessMsgFromDnode(char msgType, void *pCont, int contLen, void *pConn, int32_t code); -extern int32_t (*mgmtInitSystem)(); -extern void (*mgmtStopSystem)(); extern void (*mgmtCleanUpRedirect)(); +void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); #ifdef __cplusplus } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 6861d31b873a7898ae1e9812ade92cb25ff0c543..0e2ad607b0f6a9a4680e92f015fd34f57ca677c8 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -163,6 +163,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismat TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG_VERSION, 0, 122, "invalid version of message") +TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 123, "dnode not exist") #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f74bff7e57fa5a0e9773263d23ea536068ef866e..259774863ea8044455ef3833d5c18917c43533da 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -572,33 +572,35 @@ typedef struct { char reserved[64]; } SVnodeStatisticInfo; +typedef struct { + uint32_t moduleStatus; + uint32_t createdTime; + uint32_t numOfVnodes; + uint32_t reserved; +} SDnodeState; + typedef struct { uint32_t version; + uint32_t privateIp; uint32_t publicIp; - uint32_t lastReboot; // time stamp for last reboot + uint32_t lastReboot; // time stamp for last reboot + uint16_t numOfTotalVnodes; // from config file + uint16_t openVnodes; uint16_t numOfCores; + float diskAvailable; // GB uint8_t alternativeRole; - uint8_t reserve; - uint16_t numOfTotalVnodes; // from config file - uint16_t unused; - float diskAvailable; // GB - uint32_t openVnodes; - char reserved[16]; + uint8_t reserve[15]; SVnodeLoad load[]; } SStatusMsg; typedef struct { - int32_t code; - SRpcIpSet ipList; + int32_t code; + int32_t numOfVnodes; + SDnodeState dnodeState; + SRpcIpSet ipList; + SVnodeAccess vnodeAccess[]; } SStatusRsp; -typedef struct { - uint32_t moduleStatus; - uint32_t createdTime; - uint32_t numOfVnodes; - uint32_t reserved; -} SDnodeState; - // internal message typedef struct { uint32_t destId; diff --git a/src/mnode/inc/mgmtAcct.h b/src/mnode/inc/mgmtAcct.h index edc30409d6e4bf1f4b75fdeaa9f71ad935f8c617..751eea0d268a27d203062c352b776b83a17f60fc 100644 --- a/src/mnode/inc/mgmtAcct.h +++ b/src/mnode/inc/mgmtAcct.h @@ -27,14 +27,17 @@ int32_t mgmtRemoveDbFromAcct(SAcctObj *pAcct, SDbObj *pDb); int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser); int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser); -extern int32_t (*mgmtInitAccts)(); -extern void (*mgmtCleanUpAccts)(); -extern SAcctObj* (*mgmtGetAcct)(char *acctName); -extern int32_t (*mgmtCheckUserLimit)(SAcctObj *pAcct); -extern int32_t (*mgmtCheckDbLimit)(SAcctObj *pAcct); -extern int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate); -extern int32_t (*mgmtGetAcctMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn); +int32_t mgmtInitAccts(); +void mgmtCleanUpAccts(); +SAcctObj* mgmtGetAcct(char *acctName); + +int32_t mgmtCheckUserLimit(SAcctObj *pAcct); +int32_t mgmtCheckDbLimit(SAcctObj *pAcct); +int32_t mgmtCheckTableLimit(SAcctObj *pAcct, SCreateTableMsg *pCreate); +int32_t mgmtGetAcctMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveAccts(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +void mgmtDoStatistic(void *handle, void *tmrId); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 6159d5e5dca0336fa979295da79d04688600e553..15e3a8550aa45f479d088404e7095d0f8ef0b7b5 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -24,10 +24,6 @@ extern "C" { #include #include "mnode.h" -int32_t mgmtCreateDnode(uint32_t ip); -int32_t mgmtDropDnode(SDnodeObj *pDnode); -int32_t mgmtDropDnodeByIp(uint32_t ip); -int32_t mgmtGetNextVnode(SVnodeGid *pVnodeGid); void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId); void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes); int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); @@ -44,19 +40,17 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -extern int32_t (*mgmtInitDnodes)(); -extern void (*mgmtCleanUpDnodes)(); -extern SDnodeObj* (*mgmtGetDnode)(uint32_t ip); -extern int32_t (*mgmtGetDnodesNum)(); -extern void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode); -extern int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode); -extern void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode); -extern int32_t (*mgmtGetScoresMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg); - -extern SDnodeObj tsDnodeObj; - +int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +int32_t mgmtInitDnodes(); +void mgmtCleanUpDnodes(); +int32_t mgmtGetDnodesNum(); +int32_t mgmtUpdateDnode(SDnodeObj *pDnode); +void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode); +bool mgmtCheckConfigShow(SGlobalConfig *cfg); +void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); +SDnodeObj* mgmtGetDnode(uint32_t ip); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 772c2ba3310366c0006e7d1ce76a2e35652c0cc0..d3f5cffac3074be4921baaae28e52e0fc9a501ed 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -34,12 +34,12 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle); -extern int32_t (*mgmtInitDnodeInt)(); -extern void (*mgmtCleanUpDnodeInt)(); -extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); +int32_t mgmtInitDnodeInt(); +void mgmtCleanUpDnodeInt(); void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle); void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); +void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtGrant.h b/src/mnode/inc/mgmtGrant.h index e68e6ae71e31e6d961cf3f3a2ce352cb9a46d16f..2e7122f6190fc248e57d1fa6a508bf596467a15c 100644 --- a/src/mnode/inc/mgmtGrant.h +++ b/src/mnode/inc/mgmtGrant.h @@ -17,21 +17,21 @@ #define TDENGINE_MGMT_GTANT_H #ifdef __cplusplus -extern "C" { +"C" { #endif #include #include #include "mnode.h" -extern bool (*mgmtCheckExpired)(); -extern void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum); -extern void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeseries); -extern int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries); -extern int32_t (*mgmtCheckUserGrant)(); -extern int32_t (*mgmtCheckDbGrant)(); -extern int32_t (*mgmtGetGrantsMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn); +bool mgmtCheckExpired(); +void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum); +void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries); +int32_t mgmtCheckTimeSeries(uint32_t timeseries); +int32_t mgmtCheckUserGrant(); +int32_t mgmtCheckDbGrant(); +int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int rows, void *pConn); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index e27296de7745abc83ef8adecbfca49744f661cbc..d012997d136903d9ae232756a87b38fb54d33010 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -24,8 +24,8 @@ extern "C" { #include #include "mnode.h" -extern int32_t (*mgmtGetMnodeMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn); -extern int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn); + int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtSystem.h b/src/mnode/inc/mgmtSystem.h index 0ee119043a3d434a7d3772bf5d35c866605e4749..2ea052287d908f28c036c520403edda084dd34bb 100644 --- a/src/mnode/inc/mgmtSystem.h +++ b/src/mnode/inc/mgmtSystem.h @@ -22,13 +22,13 @@ extern "C" { #include +int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); +void mgmtStopSystem(); + + -extern int32_t (*mgmtInitSystem)(); -extern int32_t (*mgmtCheckMgmtRunning)(); -extern void (*mgmtDoStatistic)(void *handle, void *tmrId); -extern void (*mgmtStopSystem)(); extern void (*mgmtCleanUpRedirect)(); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index 15db1680d47d803c0fa0c2eb52ede5580a245659..2c57506d5315cd3a0b9a9260559cfe8fce380fe7 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -26,6 +26,16 @@ extern void *tsUserSdb; extern void *tsDbSdb; static SAcctObj tsAcctObj; +int32_t (*mgmtInitAcctsFp)() = NULL; +void (*mgmtCleanUpAcctsFp)() = NULL; +SAcctObj *(*mgmtGetAcctFp)(char *acctName) = NULL; +int32_t (*mgmtCheckUserLimitFp)(SAcctObj *pAcct) = NULL; +int32_t (*mgmtCheckDbLimitFp)(SAcctObj *pAcct) = NULL; +int32_t (*mgmtCheckTableLimitFp)(SAcctObj *pAcct, SCreateTableMsg *pCreate) = NULL; +int32_t (*mgmtGetAcctMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; +int32_t (*mgmtRetrieveAcctsFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; +void (*mgmtDoStatisticFp)(void *handle, void *tmrId) = NULL; + int32_t mgmtAddDbIntoAcct(SAcctObj *pAcct, SDbObj *pDb) { pthread_mutex_lock(&pAcct->mutex); pDb->next = pAcct->pHead; @@ -97,73 +107,95 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) { return 0; } -int32_t mgmtInitAcctsImp() { - SAcctObj *pAcct = &tsAcctObj; - pAcct->acctId = 0; - strcpy(pAcct->user, "root"); - return 0; +int32_t mgmtInitAccts() { + if (mgmtInitAcctsFp) { + return mgmtInitAcctsFp(); + } else { + SAcctObj *pAcct = &tsAcctObj; + pAcct->acctId = 0; + strcpy(pAcct->user, "root"); + return 0; + } } -int32_t (*mgmtInitAccts)() = mgmtInitAcctsImp; - -static SAcctObj *mgmtGetAcctImp(char *acctName) { - return &tsAcctObj; +SAcctObj *mgmtGetAcct(char *acctName) { + if (mgmtGetAcctFp) { + return mgmtGetAcctFp(acctName); + } else { + return &tsAcctObj; + } } -SAcctObj *(*mgmtGetAcct)(char *acctName) = mgmtGetAcctImp; - -static int32_t mgmtCheckUserLimitImp(SAcctObj *pAcct) { - int32_t numOfUsers = sdbGetNumOfRows(tsUserSdb); - if (numOfUsers >= tsMaxUsers) { - mWarn("numOfUsers:%d, exceed tsMaxUsers:%d", numOfUsers, tsMaxUsers); - return TSDB_CODE_TOO_MANY_USERS; +int32_t mgmtCheckUserLimit(SAcctObj *pAcct) { + if (mgmtCheckUserLimitFp) { + return mgmtCheckUserLimitFp(pAcct); + } else { + int32_t numOfUsers = sdbGetNumOfRows(tsUserSdb); + if (numOfUsers >= tsMaxUsers) { + mWarn("numOfUsers:%d, exceed tsMaxUsers:%d", numOfUsers, tsMaxUsers); + return TSDB_CODE_TOO_MANY_USERS; + } + return 0; } - return 0; } -int32_t (*mgmtCheckUserLimit)(SAcctObj *pAcct) = mgmtCheckUserLimitImp; - -static int32_t mgmtCheckDbLimitImp(SAcctObj *pAcct) { - int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb); - if (numOfDbs >= tsMaxDbs) { - mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); - return TSDB_CODE_TOO_MANY_DATABASES; +int32_t mgmtCheckDbLimit(SAcctObj *pAcct) { + if (mgmtCheckDbLimitFp) { + return mgmtCheckDbLimitFp(pAcct); + } else { + int32_t numOfDbs = sdbGetNumOfRows(tsDbSdb); + if (numOfDbs >= tsMaxDbs) { + mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); + return TSDB_CODE_TOO_MANY_DATABASES; + } + return 0; } - return 0; } -int32_t (*mgmtCheckDbLimit)(SAcctObj *pAcct) = mgmtCheckDbLimitImp; - -static int32_t mgmtCheckTableLimitImp(SAcctObj *pAcct, SCreateTableMsg *pCreate) { - return 0; +int32_t mgmtCheckTableLimit(SAcctObj *pAcct, SCreateTableMsg *pCreate) { + if (mgmtCheckTableLimitFp) { + return mgmtCheckTableLimitFp(pAcct, pCreate); + } else { + return 0; + } } -int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate) = mgmtCheckTableLimitImp; - -static void mgmtCleanUpAcctsImp() { +void mgmtCleanUpAccts() { + if (mgmtCleanUpAcctsFp) { + mgmtCleanUpAcctsFp(); + } } -void (*mgmtCleanUpAccts)() = mgmtCleanUpAcctsImp; - -static int32_t mgmtGetAcctMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - return TSDB_CODE_OPS_NOT_SUPPORT; +int32_t mgmtGetAcctMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + if (mgmtGetAcctMetaFp) { + return mgmtGetAcctMetaFp(pMeta, pShow, pConn); + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } } -int32_t (*mgmtGetAcctMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetAcctMetaImp; - -static int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - return 0; +int32_t mgmtRetrieveAccts(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + if (mgmtRetrieveAcctsFp) { + return mgmtRetrieveAcctsFp(pShow, data, rows, pConn); + } else { + return 0; + } } -int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveAcctsImp; - SAcctObj *mgmtGetAcctFromConn(void *pConn) { SRpcConnInfo connInfo; rpcGetConnInfo(pConn, &connInfo); - SUserObj *pUser = mgmtGetUser(connInfo.user); - if(pUser != NULL) { + + SUserObj *pUser = mgmtGetUser(connInfo.user); + if (pUser != NULL) { return pUser->pAcct; } return NULL; } + +void mgmtDoStatistic(void *handle, void *tmrId) { + if (mgmtDoStatisticFp) { + mgmtDoStatisticFp(handle, tmrId); + } +} \ No newline at end of file diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index cf1c51ad9009903b4a6c5aea10c7a098d7b6df1e..2c898fa16582e3ce6b9bb02c1507fcd5021eba40 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -32,7 +32,6 @@ void (*mgmtCleanupBalance)() = mgmtCleanupBalanceImp; int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) { int selectedVnode = -1; - SDnodeObj *pDnode = &tsDnodeObj; int lastAllocVode = pDnode->lastAllocVnode; for (int i = 0; i < pDnode->numOfVnodes; i++) { diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 5a9e9aff358dfe686f23a4422c8200bc3bd87e36..8c85d4f5f65dcf0ba42ee1f630c89073515e7c07 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -24,7 +24,17 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" -SDnodeObj tsDnodeObj; +int32_t (*mgmtInitDnodesFp)() = NULL; +void (*mgmtCleanUpDnodesFp)() = NULL; +SDnodeObj *(*mgmtGetDnodeFp)(uint32_t ip) = NULL; +int32_t (*mgmtGetDnodesNumFp)() = NULL; +int32_t (*mgmtUpdateDnodeFp)(SDnodeObj *pDnode) = NULL; +void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL; +int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; +int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; +void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL; + +static SDnodeObj tsDnodeObj; void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; @@ -154,7 +164,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } pShow->numOfRows = mgmtGetDnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; @@ -165,9 +177,9 @@ int32_t mgmtGetDnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; + int32_t cols = 0; SDnodeObj *pDnode = NULL; char *pWrite; - int32_t cols = 0; char ipstr[20]; while (numOfRows < rows) { @@ -517,85 +529,94 @@ int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon return numOfRows; } -SDnodeObj *mgmtGetDnodeImp(uint32_t ip) { - return &tsDnodeObj; +int32_t mgmtInitDnodes() { + if (mgmtInitDnodesFp) { + return mgmtInitDnodesFp(); + } else { + tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; + tsDnodeObj.createdTime = taosGetTimestampMs(); + tsDnodeObj.lastReboot = taosGetTimestampSec(); + tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores; + tsDnodeObj.status = TSDB_DN_STATUS_READY; + tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; + tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; + tsDnodeObj.thandle = (void *) (1); //hack way + tsDnodeObj.status = TSDB_DN_STATUS_READY; + mgmtSetDnodeMaxVnodes(&tsDnodeObj); + return 0; + } } -SDnodeObj *(*mgmtGetDnode)(uint32_t ip) = mgmtGetDnodeImp; - -int32_t mgmtUpdateDnodeImp(SDnodeObj *pDnode) { - return 0; +void mgmtCleanUpDnodes() { + if (mgmtCleanUpDnodesFp) { + mgmtCleanUpDnodesFp(); + } } -int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode) = mgmtUpdateDnodeImp; - -void mgmtCleanUpDnodesImp() { +SDnodeObj *mgmtGetDnode(uint32_t ip) { + if (mgmtGetDnodeFp) { + return mgmtGetDnodeFp(ip); + } else { + return &tsDnodeObj; + } } -void (*mgmtCleanUpDnodes)() = mgmtCleanUpDnodesImp; - -int32_t mgmtInitDnodesImp() { - tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; - tsDnodeObj.createdTime = taosGetTimestampMs(); - tsDnodeObj.lastReboot = taosGetTimestampSec(); - tsDnodeObj.numOfCores = (uint16_t) tsNumOfCores; - tsDnodeObj.status = TSDB_DN_STATUS_READY; - tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; - tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; - tsDnodeObj.thandle = (void *) (1); //hack way - if (tsDnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) { - mgmtSetDnodeMaxVnodes(&tsDnodeObj); - mPrint("dnode first access, set total vnodes:%d", tsDnodeObj.numOfVnodes); +int32_t mgmtGetDnodesNum() { + if (mgmtGetDnodesNumFp) { + return mgmtGetDnodesNumFp(); + } else { + return 1; } - - tsDnodeObj.status = TSDB_DN_STATUS_READY; - return 0; } -int32_t (*mgmtInitDnodes)() = mgmtInitDnodesImp; - -int32_t mgmtGetDnodesNumImp() { - return 1; +int32_t mgmtUpdateDnode(SDnodeObj *pDnode) { + if (mgmtUpdateDnodeFp) { + return mgmtUpdateDnodeFp(pDnode); + } else { + return 0; + } } -int32_t (*mgmtGetDnodesNum)() = mgmtGetDnodesNumImp; - -void *mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) { - if (*pDnode == NULL) { - *pDnode = &tsDnodeObj; +void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { + if (mgmtGetNextDnodeFp) { + return mgmtGetNextDnodeFp(pShow, pDnode); } else { - *pDnode = NULL; + if (*pDnode == NULL) { + *pDnode = &tsDnodeObj; + } else { + *pDnode = NULL; + } } return *pDnode; } -void *(*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp; - -int32_t mgmtGetScoresMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - return TSDB_CODE_OPS_NOT_SUPPORT; +int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + if (mgmtGetScoresMetaFp) { + return mgmtGetScoresMetaFp(pMeta, pShow, pConn); + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } } -int32_t (*mgmtGetScoresMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetScoresMetaImp; - -int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - return 0; +int32_t mgmtRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + if (mgmtRetrieveScoresFp) { + return mgmtRetrieveScoresFp(pShow, data, rows, pConn); + } else { + return 0; + } } -int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveScoresImp; - -void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) { +void mgmtSetDnodeUnRemove(SDnodeObj *pDnode) { + if (mgmtSetDnodeUnRemoveFp) { + mgmtSetDnodeUnRemoveFp(pDnode); + } } -void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode) = mgmtSetDnodeUnRemoveImp; - -bool mgmtCheckConfigShowImp(SGlobalConfig *cfg) { +bool mgmtCheckConfigShow(SGlobalConfig *cfg) { if (cfg->cfgType & TSDB_CFG_CTYPE_B_CLUSTER) return false; if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) return false; return true; } - -bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg) = mgmtCheckConfigShowImp; - diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 2169d6731d661afaf31a7794c393c167018435ff..fb3110d93d62e7769f8e5f2635788803811062b5 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -31,10 +31,15 @@ #include "mgmtTable.h" #include "mgmtVgroup.h" +int32_t (*mgmtInitDnodeIntFp)() = NULL; +void (*mgmtCleanUpDnodeIntFp)() = NULL; + void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL; void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL; void *mgmtStatusTimer = NULL; +static void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); + static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) { int32_t contLen = *(int32_t *) (sched->msg - 4); int32_t code = *(int32_t *) (sched->msg - 8); @@ -249,6 +254,10 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { + } else if (msgType == TSDB_MSG_TYPE_STATUS) { + mgmtProcessDnodeStatus(msgType, pConn, contLen, pConn, code); + } else if (msgType == TSDB_MSG_TYPE_GRANT) { + mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); } else { mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]); } @@ -256,8 +265,6 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p //rpcFreeCont(pCont); } - - void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle); } @@ -317,108 +324,113 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { } int32_t mgmtSendCfgDnodeMsg(char *cont) { -#ifdef CLUSTER - char * pMsg, *pStart; - int32_t msgLen = 0; -#endif - - SDnodeObj *pDnode; - SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont; - uint32_t ip; - - ip = inet_addr(pCfg->ip); - pDnode = mgmtGetDnode(ip); - if (pDnode == NULL) { - mError("dnode ip:%s not configured", pCfg->ip); - return TSDB_CODE_NOT_CONFIGURED; - } - - mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config); - int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config); - if (code != -1) { - return code; - } - -#ifdef CLUSTER - pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG); - if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE; - pMsg = pStart; - - memcpy(pMsg, cont, sizeof(SCfgDnodeMsg)); - pMsg += sizeof(SCfgDnodeMsg); - - msgLen = pMsg - pStart; - mgmtSendMsgToDnode(pDnode, pStart, msgLen); -#else - (void)tsCfgDynamicOptions(pCfg->config); -#endif - return 0; +//#ifdef CLUSTER +// char * pMsg, *pStart; +// int32_t msgLen = 0; +//#endif +// +// SDnodeObj *pDnode; +// SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont; +// uint32_t ip; +// +// ip = inet_addr(pCfg->ip); +// pDnode = mgmtGetDnode(ip); +// if (pDnode == NULL) { +// mError("dnode ip:%s not configured", pCfg->ip); +// return TSDB_CODE_NOT_CONFIGURED; +// } +// +// mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config); +// int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config); +// if (code != -1) { +// return code; +// } +// +//#ifdef CLUSTER +// pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG); +// if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE; +// pMsg = pStart; +// +// memcpy(pMsg, cont, sizeof(SCfgDnodeMsg)); +// pMsg += sizeof(SCfgDnodeMsg); +// +// msgLen = pMsg - pStart; +// mgmtSendMsgToDnode(pDnode, pStart, msgLen); +//#else +// (void)tsCfgDynamicOptions(pCfg->config); +//#endif +// return 0; } -int32_t mgmtInitDnodeIntImp() { return 0; } -int32_t (*mgmtInitDnodeInt)() = mgmtInitDnodeIntImp; - -void mgmtCleanUpDnodeIntImp() {} -void (*mgmtCleanUpDnodeInt)() = mgmtCleanUpDnodeIntImp; - -void mgmtProcessDnodeStatusImp(void *handle, void *tmrId) { -/* - SDnodeObj *pObj = &tsDnodeObj; - pObj->openVnodes = tsOpenVnodes; - pObj->status = TSDB_DN_STATUS_READY; +int32_t mgmtInitDnodeInt() { + if (mgmtInitDnodeIntFp) { + return mgmtInitDnodeIntFp(); + } else { + return 0; + } +} - float memoryUsedMB = 0; - taosGetSysMemory(&memoryUsedMB); - pObj->diskAvailable = tsAvailDataDirGB; - - for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { - SVnodeLoad *pVload = &(pObj->vload[vnode]); - SVnodeObj * pVnode = vnodeList + vnode; - - // wait vnode dropped - if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { - if (vnodeList[vnode].cfg.maxSessions <= 0) { - pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; - pVload->status = TSDB_VN_STATUS_OFFLINE; - mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode); - taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr); - } - } +void mgmtCleanUpDnodeInt() { + if (mgmtCleanUpDnodeIntFp) { + mgmtCleanUpDnodeIntFp(); + } +} - if (vnodeList[vnode].cfg.maxSessions <= 0) { - continue; - } +void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { + SStatusMsg *pStatus = (SStatusMsg *)pCont; - pVload->vnode = vnode; - pVload->status = TSDB_VN_STATUS_MASTER; - pVload->totalStorage = pVnode->vnodeStatistic.totalStorage; - pVload->compStorage = pVnode->vnodeStatistic.compStorage; - pVload->pointsWritten = pVnode->vnodeStatistic.pointsWritten; - uint32_t vgId = pVnode->cfg.vgId; - - SVgObj *pVgroup = mgmtGetVgroup(vgId); - if (pVgroup == NULL) { - mError("vgroup:%d is not there, but associated with vnode %d", vgId, vnode); - pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING; - continue; - } + SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp)); + if (pObj == NULL) { + mError("dnode:%s not exist", taosIpStr(pObj->privateIp)); + mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_DNODE_NOT_EXIST, NULL, 0); + return; + } - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("vgroup:%d not belongs to any database, vnode:%d", vgId, vnode); - continue; - } + pObj->lastReboot = htonl(pStatus->lastReboot); + pObj->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); + pObj->openVnodes = htons(pStatus->openVnodes); + pObj->numOfCores = htons(pStatus->numOfCores); + pObj->diskAvailable = pStatus->diskAvailable; + pObj->alternativeRole = pStatus->alternativeRole; +// +// if (mgmtProcessDnodeStatusFp) { +// mgmtProcessDnodeStatusFp(pStatus, pObj, pConn); +// return; +// } - if (pVload->vgId == 0 || pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { - mError("vid:%d, mgmt not exist, drop it", vnode); - pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING; - } - } + pObj->status = TSDB_DN_STATUS_READY; - taosTmrReset(mgmtProcessDnodeStatus, tsStatusInterval * 1000, NULL, tsMgmtTmr, &mgmtStatusTimer); - if (mgmtStatusTimer == NULL) { - mError("Failed to start status timer"); - } -*/ +// // wait vnode dropped +// for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { +// SVnodeLoad *pVload = &(pObj->vload[vnode]); +// if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { +// bool existInDnode = false; +// for (int32_t j = 0; j < pObj->openVnodes; ++j) { +// if (htonl(pStatus->load[j].vnode) == vnode) { +// existInDnode = true; +// break; +// } +// } +// +// if (!existInDnode) { +// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; +// pVload->status = TSDB_VN_STATUS_OFFLINE; +// mgmtUpdateDnode(pObj); +// mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode); +// taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr); +// } +// } else if (pVload->vgId == 0) { +// /* +// * In some cases, vnode information may be reported abnormally, recover it +// */ +// if (pVload->dropStatus != TSDB_VN_DROP_STATUS_READY || pVload->status != TSDB_VN_STATUS_OFFLINE) { +// mPrint("dnode:%s, vid:%d, vgroup:%d status:%s dropStatus:%s, set it to avail status", +// taosIpStr(pObj->privateIp), vnode, pVload->vgId, taosGetVnodeStatusStr(pVload->status), +// taosGetVnodeDropStatusStr(pVload->dropStatus)); +// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; +// pVload->status = TSDB_VN_STATUS_OFFLINE; +// mgmtUpdateDnode(pObj); +// } +// } +// } } -void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId) = mgmtProcessDnodeStatusImp; diff --git a/src/mnode/src/mgmtGrant.c b/src/mnode/src/mgmtGrant.c index 37a0753c23eb77db07d054b6227f0457f2bcf289..df151ac5001ef140c23082922f97b8925ad51672 100644 --- a/src/mnode/src/mgmtGrant.c +++ b/src/mnode/src/mgmtGrant.c @@ -19,31 +19,74 @@ #include "mgmtAcct.h" #include "mgmtGrant.h" -int32_t mgmtCheckUserGrantImp() { return 0; } -int32_t (*mgmtCheckUserGrant)() = mgmtCheckUserGrantImp; +int32_t (*mgmtCheckUserGrantFp)() = NULL; +int32_t (*mgmtCheckDbGrantFp)() = NULL; +void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; +void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; +int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL; +bool (*mgmtCheckExpiredFp)() = NULL; +int32_t (*mgmtGetGrantsMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; +int32_t (*mgmtRetrieveGrantsFp)(SShowObj *pShow, char *data, int rows, void *pConn) = NULL; -int32_t mgmtCheckDbGrantImp() { return 0; } -int32_t (*mgmtCheckDbGrant)() = mgmtCheckDbGrantImp; +int32_t mgmtCheckUserGrant() { + if (mgmtCheckUserGrantFp) { + return mgmtCheckUserGrantFp(); + } else { + return 0; + } +} + +int32_t mgmtCheckDbGrant() { + if (mgmtCheckDbGrantFp) { + return mgmtCheckDbGrantFp(); + } else { + return 0; + } +} -void mgmtAddTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { +void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; + if (mgmtAddTimeSeriesFp) { + mgmtAddTimeSeriesFp(timeSeriesNum); + } } -void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtAddTimeSeriesImp; -void mgmtRestoreTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { +void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; + if (mgmtRestoreTimeSeriesFp) { + mgmtRestoreTimeSeriesFp(timeSeriesNum); + } } -void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtRestoreTimeSeriesImp; -int32_t mgmtCheckTimeSeriesImp(uint32_t timeseries) { return 0; } -int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries) = mgmtCheckTimeSeriesImp; +int32_t mgmtCheckTimeSeries(uint32_t timeseries) { + if (mgmtCheckTimeSeriesFp) { + return mgmtCheckTimeSeriesFp(timeseries); + } else { + return 0; + } +} -bool mgmtCheckExpiredImp() { return false; } -bool (*mgmtCheckExpired)() = mgmtCheckExpiredImp; +bool mgmtCheckExpired() { + if (mgmtCheckExpiredFp) { + return mgmtCheckExpiredFp(); + } else { + return false; + } +} -int32_t mgmtGetGrantsMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; } -int32_t (*mgmtGetGrantsMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetGrantsMetaImp; +int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + if (mgmtGetGrantsMetaFp) { + return mgmtGetGrantsMetaFp(pMeta, pShow, pConn); + } else { + return TSDB_CODE_OPS_NOT_SUPPORT; + } +} -int32_t mgmtRetrieveGrantsImp(SShowObj *pShow, char *data, int rows, void *pConn) { return 0; } -int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn) = mgmtRetrieveGrantsImp; +int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int rows, void *pConn) { + if (mgmtRetrieveGrantsFp) { + return mgmtRetrieveGrantsFp(pShow, data, rows, pConn); + } else { + return 0; + } +} diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index cb9e99135ae992eae8f062bf738264d2d1e7e330..8e38b679204cea9016a63966f7626eccd92c652a 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -14,16 +14,134 @@ */ #define _DEFAULT_SOURCE +#include "tschemautil.h" #include "mgmtMnode.h" +#include "mgmtUser.h" -int32_t mgmtGetMnodeMetaImp(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - return TSDB_CODE_OPS_NOT_SUPPORT; -} +void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; +int32_t (*mgmtInitMnodesFp)() = NULL; +int32_t (*mgmtGetMnodesNumFp)() = NULL; + +static int32_t mgmtGetMnodesNum(); +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode); + +int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + int32_t cols = 0; + + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + SSchema *pSchema = tsGetSchema(pMeta); + + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "IP"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; -int32_t (*mgmtGetMnodeMeta)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetMnodeMetaImp; + pShow->bytes[cols] = 10; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "status"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 10; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "role"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "public ip"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = mgmtGetMnodesNum(); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + pShow->pNode = NULL; -int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) { return 0; } -int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveMnodesImp; +int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + int32_t cols = 0; + SSdbPeer *pMnode = NULL; + char *pWrite; + char ipstr[20]; + + while (numOfRows < rows) { + pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode); + + + pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode); + if (pMnode == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, pMnode->ipstr); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pMnode->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, sdbStatusStr[(uint8_t)pMnode->status]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, sdbRoleStr[(uint8_t)pMnode->role]); + cols++; + + tinet_ntoa(ipstr, pMnode->publicIp); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, ipstr); + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static int32_t mgmtGetMnodesNum() { + if (mgmtGetMnodesNumFp) { + return mgmtGetMnodesNumFp(); + } else { + return 1; + } +} + +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { + if (mgmtGetNextMnodeFp) { + return mgmtGetNextMnodeFp(pShow, pMnode); + } else { + if (*pMnode == NULL) { + *pMnode = &tsMnodeObj; + } else { + *pMnode = NULL; + } + } + + return *pMnode; +} \ No newline at end of file diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 598ec600db73452549a880076eb286dce00d1390..8363138072fa3b50495e1360fe83e91ad35ecb35 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -51,6 +51,7 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); +uint32_t mgmtAccessSquence = 0; void *tsShellConnServer = NULL; void mgmtProcessTranRequest(SSchedMsg *sched) { diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index 1eb114aa8974f914b37fe73284858d8c43707fe7..85c0e0ab0f1f3f3cd63ab42a0016bf6b8461bf11 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" +#include "tmodule.h" #include "tsched.h" #include "mnode.h" #include "mgmtAcct.h" @@ -55,6 +56,20 @@ void mgmtCleanUpSystem() { mPrint("mgmt is cleaned up"); } +int32_t mgmtCheckMgmtRunning() { + if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) { + return -1; + } + + tsetModuleStatus(TSDB_MOD_MGMT); + +// strcpy(sdbMasterIp, mgmtIpStr[0]); + strcpy(sdbPrivateIp, tsPrivateIp); + sdbPublicIp = inet_addr(tsPublicIp); + + return 0; +} + int32_t mgmtStartSystem() { mPrint("starting to initialize TDengine mgmt ..."); @@ -111,10 +126,10 @@ int32_t mgmtStartSystem() { return -1; } - if (mgmtInitShell() < 0) { - mError("failed to init shell"); - return -1; - } +// if (mgmtInitShell() < 0) { +// mError("failed to init shell"); +// return -1; +// } if (sdbInitPeers(tsMgmtDirectory) < 0) { mError("failed to init peers"); @@ -125,39 +140,41 @@ int32_t mgmtStartSystem() { mError("failed to init dnode balance") } - taosTmrReset(mgmtDoStatistic, tsStatusInterval * 30000, NULL, tsMgmtTmr, &tsMgmtStatisTimer); + if (mgmtDoStatistic) { + taosTmrReset(mgmtDoStatistic, tsStatusInterval * 30000, NULL, tsMgmtTmr, &tsMgmtStatisTimer); + } mPrint("TDengine mgmt is initialized successfully"); return 0; } -int32_t mgmtInitSystemImp() { - int32_t code = mgmtStartSystem(); - if (code != 0) { - return code; - } +int32_t mgmtInitSystem() { + struct stat dirstat; + bool directoryExist = (stat(tsMgmtDirectory, &dirstat) == 0); + bool equalWithMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0); - taosTmrReset(mgmtProcessDnodeStatus, 500, NULL, tsMgmtTmr, &mgmtStatusTimer); - return code; -} + if (equalWithMaster || directoryExist) { + if (mgmtStartSystem() != 0) { + return -1; + } + } -int32_t (*mgmtInitSystem)() = mgmtInitSystemImp; + if (mgmtInitShell() < 0) { + mError("failed to init shell"); + return -1; + } -int32_t mgmtCheckMgmtRunningImp() { return 0; } -int32_t (*mgmtCheckMgmtRunning)() = mgmtCheckMgmtRunningImp; - -void mgmtDoStatisticImp(void *handle, void *tmrId) {} - -void (*mgmtDoStatistic)(void *handle, void *tmrId) = mgmtDoStatisticImp; - -void mgmtStopSystemImp() {} - -void (*mgmtStopSystem)() = mgmtStopSystemImp; - -void mgmtCleanUpRedirectImp() {} +void mgmtStopSystem() { + if (sdbMaster) { + mTrace("it is a master mgmt node, it could not be stopped"); + return; + } -void (*mgmtCleanUpRedirect)() = mgmtCleanUpRedirectImp; + mgmtCleanUpSystem(); + remove(tsMgmtDirectory); +// mgmtInitRedirect(); +} diff --git a/src/os/linux/src/tsystem.c b/src/os/linux/src/tsystem.c index 8cd0e6943616f4ecb1f69ed100ca6535968005ae..ea7b64980f6cf796af26a4a323c60bf041b9c39f 100644 --- a/src/os/linux/src/tsystem.c +++ b/src/os/linux/src/tsystem.c @@ -585,6 +585,7 @@ void tsPrintOsInfo() { pPrint(" os release: %s", buf.release); pPrint(" os version: %s", buf.version); pPrint(" os machine: %s", buf.machine); + pPrint("=================================="); } void taosKillSystem() { diff --git a/src/util/inc/tglobalcfg.h b/src/util/inc/tglobalcfg.h index 018f5dbcbb75b8a7ba7577f36bdf88f43c7b7e3b..bbb824cd3d9100f44d9abf7d5085a63e72b04eff 100644 --- a/src/util/inc/tglobalcfg.h +++ b/src/util/inc/tglobalcfg.h @@ -193,7 +193,6 @@ extern char tsCharset[64]; // default encode string // void tsReadGlobalLogConfig(); bool tsReadGlobalConfig(); -bool tsReadGlobalConfigSpec(); int tsCfgDynamicOptions(char *msg); void tsPrintGlobalConfig(); void tsPrintGlobalConfigSpec(); diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index cdb8d7c8f2f93821342cf052728c0335b52090b8..a49873d30a83c02c2faa4bebbca36190d31a389f 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -24,6 +24,9 @@ #include "tsystem.h" #include "tutil.h" +void (*tsReadStorageConfig)() = NULL; +void (*tsPrintStorageConfig)() = NULL; + // monitor module api int (*startMonitor)() = NULL; void (*stopMonitor)() = NULL; @@ -942,7 +945,9 @@ bool tsReadGlobalConfig() { fclose(fp); } - tsReadGlobalConfigSpec(); + if (tsReadStorageConfig) { + tsReadStorageConfig(); + } if (tsPrivateIp[0] == 0) { taosGetPrivateIp(tsPrivateIp); @@ -1111,11 +1116,13 @@ void tsPrintGlobalConfig() { } } - tsPrintGlobalConfigSpec(); + if (tsPrintStorageConfig) { + tsPrintStorageConfig(); + } else { + pPrint(" dataDir: %s", dataDir); + } tsPrintOsInfo(); - - pPrint("=================================="); } void tsSetAllDebugFlag() { @@ -1206,12 +1213,3 @@ void tsSetTimeZone() { pPrint("timezone format changed to %s", tsTimezone); } -#ifndef CLUSTER - -bool tsReadGlobalConfigSpec() { return true; } - -void tsPrintGlobalConfigSpec() { - pPrint(" dataDir: %s", dataDir); -} - -#endif