diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e09f6d11bdea1f3dbf5a08ab37d3086c9c1d2ed4..a0f83200c9b8f0380b5d33ebab4999c88d5a80d0 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -52,6 +52,12 @@ extern bool tsEnableSlaveQuery; extern bool tsPrintAuth; extern int64_t tsTickPerDay[3]; +// monitor +extern bool tsEnableMonitor; +extern int32_t tsMonitorInterval; +extern char tsMonitorFqdn[]; +extern uint16_t tsMonitorPort; + // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d66895888b60d4c7d265079513ab5415b3ed9507..9905cf9833cf16e000cb4e0ca308d2144db7466a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -46,6 +46,12 @@ int32_t tsMaxBinaryDisplayWidth = 30; bool tsEnableSlaveQuery = 1; bool tsPrintAuth = 0; +// monitor +bool tsEnableMonitor = 1; +int32_t tsMonitorInterval = 5; +char tsMonitorFqdn[TSDB_FQDN_LEN] = {0}; +uint16_t tsMonitorPort = 6043; + /* * denote if the server needs to compress response message at the application layer to client, including query rsp, * metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server. @@ -314,6 +320,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; + + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; + if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1; + return 0; } @@ -345,7 +357,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { } static void taosSetClientCfg(SConfig *pCfg) { - tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_EP_LEN); + tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort); @@ -425,6 +437,11 @@ static void taosSetServerCfg(SConfig *pCfg) { tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; + tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; + tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; + tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN); + tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32; + if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/dnode/mgmt/impl/src/dndEnv.c b/source/dnode/mgmt/impl/src/dndEnv.c index 9f6c22067d7522dc80a5a4d11294dff6ff362fe2..da75b4524adfc4b8a428b0bfce286427eb4b3fe0 100644 --- a/source/dnode/mgmt/impl/src/dndEnv.c +++ b/source/dnode/mgmt/impl/src/dndEnv.c @@ -140,7 +140,7 @@ static int32_t dndInitDir(SDnode *pDnode, SDnodeObjCfg *pCfg) { return 0; } -static void dndCloseImp(SDnode *pDnode) { +static void dndCloseDir(SDnode *pDnode) { tfree(pDnode->dir.mnode); tfree(pDnode->dir.vnodes); tfree(pDnode->dir.dnode); @@ -260,7 +260,7 @@ void dndClose(SDnode *pDnode) { dndCleanupMgmt(pDnode); tfsClose(pDnode->pTfs); - dndCloseImp(pDnode); + dndCloseDir(pDnode); free(pDnode); dInfo("dnode object is closed, data:%p", pDnode); } @@ -288,9 +288,8 @@ int32_t dndInit() { return -1; } - SVnodeOpt vnodeOpt = {.nthreads = tsNumOfCommitThreads, - .putReqToVQueryQFp = dndPutReqToVQueryQ, - .sendReqToDnodeFp = dndSendReqToDnode}; + SVnodeOpt vnodeOpt = { + .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = dndSendReqToDnode}; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode since %s", terrstr()); diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 8f18222ab663e27f6e022d8a080ba0147e7750fb..ac9f53a935a236ebad2a1fdedf59cfc09c36b4ee 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -473,19 +473,39 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { rpcSendResponse(&rpcRsp); } +static void dndSendMonitorReport(SDnode *pDnode) { + if (!tsEnableMonitor || tsMonitorFqdn[0] == 0) return; + + dTrace("pDnode:%p, send monitor report to %s:%u", pDnode, tsMonitorFqdn, tsMonitorPort); +} + static void *dnodeThreadRoutine(void *param) { SDnode *pDnode = param; SDnodeMgmt *pMgmt = &pDnode->dmgmt; - int32_t ms = tsStatusInterval * 1000; + int64_t lastStatusTime = taosGetTimestampMs(); + int64_t lastMonitorTime = lastStatusTime; setThreadName("dnode-hb"); while (true) { pthread_testcancel(); - taosMsleep(ms); + taosMsleep(200); + if (dndGetStat(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) { + continue; + } - if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) { + int64_t curTime = taosGetTimestampMs(); + + float statusInterval = (curTime - lastStatusTime) / 1000.0f; + if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) { dndSendStatusReq(pDnode); + lastStatusTime = curTime; + } + + float monitorInterval = (curTime - lastMonitorTime) / 1000.0f; + if (monitorInterval >= tsMonitorInterval) { + dndSendMonitorReport(pDnode); + lastMonitorTime = curTime; } } }