/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "mndAcct.h" #include "mndCluster.h" #include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" #include "mndFunc.h" #include "mndGrant.h" #include "mndInfoSchema.h" #include "mndMnode.h" #include "mndPerfSchema.h" #include "mndPrivilege.h" #include "mndProfile.h" #include "mndQnode.h" #include "mndQuery.h" #include "mndShow.h" #include "mndSma.h" #include "mndSnode.h" #include "mndStb.h" #include "mndStream.h" #include "mndSubscribe.h" #include "mndSync.h" #include "mndTelem.h" #include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" static inline int32_t mndAcquireRpc(SMnode *pMnode) { int32_t code = 0; taosThreadRwlockRdlock(&pMnode->lock); if (pMnode->stopped) { terrno = TSDB_CODE_APP_NOT_READY; code = -1; } else if (!mndIsLeader(pMnode)) { code = -1; } else { #if 1 atomic_add_fetch_32(&pMnode->rpcRef, 1); #else int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); mTrace("mnode rpc is acquired, ref:%d", ref); #endif } taosThreadRwlockUnlock(&pMnode->lock); return code; } static inline void mndReleaseRpc(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); #if 1 atomic_sub_fetch_32(&pMnode->rpcRef, 1); #else int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); mTrace("mnode rpc is released, ref:%d", ref); #endif taosThreadRwlockUnlock(&pMnode->lock); } static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq); if (contLen <= 0) return NULL; void *pReq = rpcMallocCont(contLen); if (pReq == NULL) return NULL; tSerializeSMTimerMsg(pReq, contLen, &timerReq); *pContLen = contLen; return pReq; } static void mndPullupTrans(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } static void mndPullupTtl(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } static void mndPullupTelem(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } static void mndPullupGrant(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } static void mndIncreaseUpTime(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } static void *mndThreadFp(void *param) { SMnode *pMnode = param; int64_t lastTime = 0; setThreadName("mnode-timer"); while (1) { lastTime++; taosMsleep(100); if (mndGetStop(pMnode)) break; if (lastTime % 10 != 0) continue; int64_t sec = lastTime / 10; if (sec % tsTtlPushInterval == 0) { mndPullupTtl(pMnode); } if (sec % tsTransPullupInterval == 0) { mndPullupTrans(pMnode); } if (sec % tsMqRebalanceInterval == 0) { mndCalMqRebalance(pMnode); } if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { mndPullupTelem(pMnode); } if (sec % tsGrantHBInterval == 0) { mndPullupGrant(pMnode); } if (sec % tsUptimeInterval == 0) { mndIncreaseUpTime(pMnode); } } return NULL; } static int32_t mndInitTimer(SMnode *pMnode) { TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) { mError("failed to create timer thread since %s", strerror(errno)); return -1; } taosThreadAttrDestroy(&thAttr); tmsgReportStartup("mnode-timer", "initialized"); return 0; } static void mndCleanupTimer(SMnode *pMnode) { if (taosCheckPthreadValid(pMnode->thread)) { taosThreadJoin(pMnode->thread, NULL); taosThreadClear(&pMnode->thread); } } static int32_t mndCreateDir(SMnode *pMnode, const char *path) { pMnode->path = strdup(path); if (pMnode->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } if (taosMkDir(pMnode->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } return 0; } static int32_t mndInitWal(SMnode *pMnode) { char path[PATH_MAX + 20] = {0}; snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); SWalCfg cfg = { .vgId = 1, .fsyncPeriod = 0, .rollPeriod = -1, .segSize = -1, .retentionPeriod = -1, .retentionSize = -1, .level = TAOS_WAL_FSYNC, }; pMnode->pWal = walOpen(path, &cfg); if (pMnode->pWal == NULL) { mError("failed to open wal since %s. wal:%s", terrstr(), path); return -1; } return 0; } static void mndCloseWal(SMnode *pMnode) { if (pMnode->pWal != NULL) { walClose(pMnode->pWal); pMnode->pWal = NULL; } } static int32_t mndInitSdb(SMnode *pMnode) { SSdbOpt opt = {0}; opt.path = pMnode->path; opt.pMnode = pMnode; opt.pWal = pMnode->pWal; opt.sync = pMnode->syncMgmt.sync; pMnode->pSdb = sdbInit(&opt); if (pMnode->pSdb == NULL) { return -1; } return 0; } static int32_t mndOpenSdb(SMnode *pMnode) { if (!pMnode->deploy) { return sdbReadFile(pMnode->pSdb); } else { return 0; } } static void mndCleanupSdb(SMnode *pMnode) { if (pMnode->pSdb) { sdbCleanup(pMnode->pSdb); pMnode->pSdb = NULL; } } static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) { SMnodeStep step = {0}; step.name = name; step.initFp = initFp; step.cleanupFp = cleanupFp; if (taosArrayPush(pMnode->pSteps, &step) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } return 0; } static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1; if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1; if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1; if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1; if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1; if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1; if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1; if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1; return 0; } static void mndCleanupSteps(SMnode *pMnode, int32_t pos) { if (pMnode->pSteps == NULL) return; if (pos == -1) { pos = taosArrayGetSize(pMnode->pSteps) - 1; } for (int32_t s = pos; s >= 0; s--) { SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s); mInfo("%s will cleanup", pStep->name); if (pStep->cleanupFp != NULL) { (*pStep->cleanupFp)(pMnode); } } taosArrayClear(pMnode->pSteps); taosArrayDestroy(pMnode->pSteps); pMnode->pSteps = NULL; } static int32_t mndExecSteps(SMnode *pMnode) { int32_t size = taosArrayGetSize(pMnode->pSteps); for (int32_t pos = 0; pos < size; pos++) { SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos); if (pStep->initFp == NULL) continue; if ((*pStep->initFp)(pMnode) != 0) { int32_t code = terrno; mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr()); mndCleanupSteps(pMnode, pos); terrno = code; return -1; } else { mInfo("%s is initialized", pStep->name); tmsgReportStartup(pStep->name, "initialized"); } } pMnode->clusterId = mndGetClusterId(pMnode); return 0; } static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->msgCb = pOption->msgCb; pMnode->selfDnodeId = pOption->dnodeId; pMnode->syncMgmt.selfIndex = pOption->selfIndex; pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas; memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas)); } SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { mInfo("start to open mnode in %s", path); SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode)); if (pMnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to open mnode since %s", terrstr()); return NULL; } memset(pMnode, 0, sizeof(SMnode)); char timestr[24] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); mndSetOptions(pMnode, pOption); pMnode->deploy = pOption->deploy; pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep)); if (pMnode->pSteps == NULL) { taosMemoryFree(pMnode); terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to open mnode since %s", terrstr()); return NULL; } int32_t code = mndCreateDir(pMnode, path); if (code != 0) { code = terrno; mError("failed to open mnode since %s", terrstr()); mndClose(pMnode); terrno = code; return NULL; } code = mndInitSteps(pMnode); if (code != 0) { code = terrno; mError("failed to open mnode since %s", terrstr()); mndClose(pMnode); terrno = code; return NULL; } code = mndExecSteps(pMnode); if (code != 0) { code = terrno; mError("failed to open mnode since %s", terrstr()); mndClose(pMnode); terrno = code; return NULL; } mInfo("mnode open successfully"); return pMnode; } void mndPreClose(SMnode *pMnode) { if (pMnode != NULL) { atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0); syncLeaderTransfer(pMnode->syncMgmt.sync); #if 0 mInfo("vgId:1, mnode start leader transfer"); // wait for leader transfer finish while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) { taosMsleep(10); mInfo("vgId:1, mnode waiting for leader transfer"); } mInfo("vgId:1, mnode finish leader transfer"); #endif } } void mndClose(SMnode *pMnode) { if (pMnode != NULL) { mInfo("start to close mnode"); mndCleanupSteps(pMnode, -1); taosMemoryFreeClear(pMnode->path); taosMemoryFreeClear(pMnode); mInfo("mnode is closed"); } } int32_t mndStart(SMnode *pMnode) { mndSyncStart(pMnode); if (pMnode->deploy) { if (sdbDeploy(pMnode->pSdb) != 0) { mError("failed to deploy sdb while start mnode"); return -1; } mndSetRestored(pMnode, true); } grantReset(pMnode, TSDB_GRANT_ALL, 0); return mndInitTimer(pMnode); } void mndStop(SMnode *pMnode) { mndSetStop(pMnode); mndSyncStop(pMnode); mndCleanupTimer(pMnode); } int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; mInfo("vgId:%d, process sync ctrl msg", 1); if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); if (pSyncNode == NULL) { mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg); code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg); syncHeartbeatDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg); code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); syncHeartbeatReplyDestroy(pSyncMsg); } syncNodeRelease(pSyncNode); if (code != 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } return code; } int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); if (pSyncNode == NULL) { mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); code = syncNodeOnTimer(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); code = syncNodeOnPing(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); code = syncNodeOnPingReply(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL); syncClientRequestDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); code = syncNodeOnRequestVote(pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); syncSnapshotSendDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); syncLocalCmdDestroy(pSyncMsg); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); code = -1; } syncNodeRelease(pSyncNode); if (code != 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } return code; } static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY || pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) { return 0; } if (mndAcquireRpc(pMsg->info.node) == 0) return 0; SMnode *pMnode = pMsg->info.node; const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync); bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync); if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, pMnode->stopped, restored, role); return -1; } const STraceId *trace = &pMsg->info.traceId; SEpSet epSet = {0}; mndGetMnodeEpSet(pMnode, &epSet); mDebug( "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d " "role:%s, redirect numOfEps:%d inUse:%d", pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps, epSet.inUse); if (epSet.numOfEps > 0) { for (int32_t i = 0; i < epSet.numOfEps; ++i) { mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); } int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); pMsg->info.rsp = rpcMallocCont(contLen); pMsg->info.hasEpSet = 1; if (pMsg->info.rsp != NULL) { tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); pMsg->info.rspLen = contLen; terrno = TSDB_CODE_RPC_REDIRECT; } else { terrno = TSDB_CODE_OUT_OF_MEMORY; } } else { terrno = TSDB_CODE_APP_NOT_READY; } return -1; } static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; const STraceId *trace = &pMsg->info.traceId; mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; return -1; } int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; if (fp == NULL) { mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED; return -1; } if (mndCheckMsgContent(pMsg) != 0) return -1; if (mndCheckMnodeState(pMsg) != 0) return -1; mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); int32_t code = (*fp)(pMsg); mndReleaseRpc(pMnode); if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mGTrace("msg:%p, won't response immediately since in progress", pMsg); } else if (code == 0) { mGTrace("msg:%p, successfully processed", pMsg); } else { mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); } return code; } void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { tmsg_t type = TMSG_INDEX(msgType); if (type < TDMT_MAX) { pMnode->msgFp[type] = fp; } } // Note: uid 0 is reserved int64_t mndGenerateUid(const char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); do { int64_t us = taosGetTimestampUs(); int64_t x = (us & 0x000000FFFFFFFFFF) << 24; int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); if (uuid) { return llabs(uuid); } } while (true); } int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { if (mndAcquireRpc(pMnode) != 0) return -1; SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc)); pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc)); pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc)); if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL || pStbInfo->stbs == NULL) { mndReleaseRpc(pMnode); return -1; } // cluster info tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version)); pClusterInfo->monitor_interval = tsMonitorInterval; pClusterInfo->connections_total = mndGetNumOfConnections(pMnode); pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB); pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB); void *pIter = NULL; while (1) { SDnodeObj *pObj = NULL; pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj); if (pIter == NULL) break; SMonDnodeDesc desc = {0}; desc.dnode_id = pObj->id; tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep)); if (mndIsDnodeOnline(pObj, ms)) { tstrncpy(desc.status, "ready", sizeof(desc.status)); } else { tstrncpy(desc.status, "offline", sizeof(desc.status)); } taosArrayPush(pClusterInfo->dnodes, &desc); sdbRelease(pSdb, pObj); } pIter = NULL; while (1) { SMnodeObj *pObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); if (pIter == NULL) break; SMonMnodeDesc desc = {0}; desc.mnode_id = pObj->id; tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep)); if (pObj->id == pMnode->selfDnodeId) { pClusterInfo->first_ep_dnode_id = pObj->id; tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep)); pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode); // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f); tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role)); } else { tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role)); } taosArrayPush(pClusterInfo->mnodes, &desc); sdbRelease(pSdb, pObj); } // vgroup info pIter = NULL; while (1) { SVgObj *pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; pClusterInfo->vgroups_total++; pClusterInfo->tbs_total += pVgroup->numOfTables; SMonVgroupDesc desc = {0}; desc.vgroup_id = pVgroup->vgId; SName name = {0}; tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameGetDbName(&name, desc.database_name); desc.tables_num = pVgroup->numOfTables; pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; tstrncpy(desc.status, "unsynced", sizeof(desc.status)); for (int32_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; pVnDesc->dnode_id = pVgid->dnodeId; tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role)); if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) { tstrncpy(desc.status, "ready", sizeof(desc.status)); pClusterInfo->vgroups_alive++; } if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) { pClusterInfo->vnodes_alive++; } pClusterInfo->vnodes_total++; } taosArrayPush(pVgroupInfo->vgroups, &desc); sdbRelease(pSdb, pVgroup); } // stb info pIter = NULL; while (1) { SStbObj *pStb = NULL; pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); if (pIter == NULL) break; SMonStbDesc desc = {0}; SName name1 = {0}; tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameGetDbName(&name1, desc.database_name); SName name2 = {0}; tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN); taosArrayPush(pStbInfo->stbs, &desc); sdbRelease(pSdb, pStb); } // grant info pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f; pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed; if (pMnode->grant.expireTimeMS == 0) { pGrantInfo->expire_time = INT32_MAX; pGrantInfo->timeseries_total = INT32_MAX; } mndReleaseRpc(pMnode); return 0; } int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync); pLoad->syncRestore = pMnode->restored; mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore); return 0; } void mndSetRestored(SMnode *pMnode, bool restored) { if (restored) { taosThreadRwlockWrlock(&pMnode->lock); pMnode->restored = true; taosThreadRwlockUnlock(&pMnode->lock); mTrace("mnode set restored:%d", restored); } else { taosThreadRwlockWrlock(&pMnode->lock); pMnode->restored = false; taosThreadRwlockUnlock(&pMnode->lock); mTrace("mnode set restored:%d", restored); while (1) { if (pMnode->rpcRef <= 0) break; taosMsleep(3); } } } bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; } void mndSetStop(SMnode *pMnode) { taosThreadRwlockWrlock(&pMnode->lock); pMnode->stopped = true; taosThreadRwlockUnlock(&pMnode->lock); mTrace("mnode set stopped"); } bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }