From 4690773668d69ef44ee4d2ed1c4c8e950165c6ba Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 2 Nov 2021 14:27:20 +0800 Subject: [PATCH] refact dnode-dnode file --- include/common/taosmsg.h | 143 +++---- include/common/tglobal.h | 2 +- include/server/vnode/vnode.h | 4 - source/common/src/tglobal.c | 10 +- source/dnode/mgmt/inc/dnodeDnode.h | 19 +- source/dnode/mgmt/inc/dnodeInt.h | 1 + source/dnode/mgmt/inc/dnodeMnode.h | 1 + source/dnode/mgmt/inc/dnodeVnodes.h | 2 + source/dnode/mgmt/src/dnodeDnode.c | 539 +++++++++++++------------ source/dnode/mgmt/src/dnodeMnode.c | 9 +- source/dnode/mgmt/src/dnodeTransport.c | 175 ++++---- source/dnode/mgmt/src/dnodeVnodes.c | 6 +- source/dnode/mnode/inc/mnodeDef.h | 2 +- source/dnode/mnode/src/mnodeWorker.c | 12 +- source/libs/transport/src/rpcMain.c | 2 +- 15 files changed, 455 insertions(+), 472 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index dde737162b..c214c96f35 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -65,30 +65,6 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) -// message from mnode to dnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" ) - - // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -121,6 +97,29 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" ) + +// message from mnode to dnode +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" ) @@ -133,9 +132,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" ) // message from dnode to mnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" ) @@ -585,20 +584,6 @@ typedef struct SRetrieveTableRsp { char data[]; } SRetrieveTableRsp; -typedef struct { - int32_t vgId; - int32_t dbCfgVersion; - int64_t totalStorage; - int64_t compStorage; - int64_t pointsWritten; - uint64_t vnodeVersion; - int32_t vgCfgVersion; - uint8_t status; - uint8_t role; - uint8_t replica; - uint8_t compact; -} SVnodeLoad; - typedef struct { char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; int32_t cacheBlockSize; //MB @@ -665,28 +650,47 @@ typedef struct { uint8_t ignoreNotExists; } SDropDbMsg, SUseDbMsg, SSyncDbMsg; -// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed -// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE typedef struct { - int64_t pointsWritten; // In unit of points - int64_t totalStorage; // In unit of bytes - int64_t compStorage; // In unit of bytes - int64_t queryTime; // In unit of second ?? - char reserved[64]; -} SVnodeStatisticInfo; + int32_t statusInterval; + int8_t reserved[4]; + int64_t checkTime; // 1970-01-01 00:00:00.000 + char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone + char locale[TSDB_LOCALE_LEN]; // tsLocale + char charset[TSDB_LOCALE_LEN]; // tsCharset +} SClusterCfg; -typedef struct SVgroupAccess { - int32_t vgId; - int8_t accessState; -} SVgroupAccess; +typedef struct { + int32_t vgId; + int8_t status; + int8_t role; + int8_t reserved[2]; + int64_t totalStorage; + int64_t compStorage; + int64_t pointsWritten; + int64_t tablesNum; +} SVnodeLoad; + +typedef struct { + int32_t vnodeNum; + SVnodeLoad vnodeLoads[]; +} SVnodeLoads; + +typedef struct SStatusMsg { + uint32_t sversion; + int32_t dnodeId; + int64_t clusterId; + uint32_t rebootTime; // time stamp for last reboot + int32_t numOfCores; + char dnodeEp[TSDB_EP_LEN]; + SClusterCfg clusterCfg; + SVnodeLoads vnodeLoads; +} SStatusMsg; typedef struct { int32_t dnodeId; int8_t dropped; - char reserved[19]; + char reserved[3]; int64_t clusterId; - int32_t numOfDnodes; - int32_t numOfVnodes; } SDnodeCfg; typedef struct { @@ -703,31 +707,8 @@ typedef struct { } SDnodeEps; typedef struct { - int32_t statusInterval; // tsStatusInterval - int8_t reserved[36]; - int64_t checkTime; // 1970-01-01 00:00:00.000 - char timezone[64]; // tsTimezone - char locale[TSDB_LOCALE_LEN]; // tsLocale - char charset[TSDB_LOCALE_LEN]; // tsCharset -} SClusterCfg; - -typedef struct SStatusMsg { - uint32_t version; - int32_t dnodeId; - uint32_t lastReboot; // time stamp for last reboot - int32_t openVnodes; - int32_t numOfCores; - float diskAvailable; - int8_t reserved[36]; - char dnodeEp[TSDB_EP_LEN]; - int64_t clusterId; - SClusterCfg clusterCfg; - SVnodeLoad load[]; -} SStatusMsg; - -typedef struct { - SDnodeCfg dnodeCfg; - SVgroupAccess vgAccess[]; + SDnodeCfg dnodeCfg; + SDnodeEps dnodeEps; } SStatusRsp; typedef struct { diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 092f31cbde..f3fce8becd 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -198,7 +198,7 @@ extern SDiskCfg tsDiskCfg[]; void taosInitGlobalCfg(); int32_t taosCheckGlobalCfg(); -bool taosCfgDynamicOptions(char *msg); +int32_t taosCfgDynamicOptions(char *msg); int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId); void taosAddDataDir(int index, char *v1, int level, int primary); diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index 36112a2cf8..a9f40aad77 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -58,10 +58,6 @@ typedef struct { int8_t syncRole; } SVnodeStatus; -typedef struct { - int32_t accessState; -} SVnodeAccess; - typedef struct SVnodeMsg { int32_t msgType; int32_t code; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e74eb89aef..4f30327c06 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -277,13 +277,13 @@ void taosSetAllDebugFlag() { } } -bool taosCfgDynamicOptions(char *msg) { +int32_t taosCfgDynamicOptions(char *msg) { char *option, *value; int32_t olen, vlen; int32_t vint = 0; paGetToken(msg, &option, &olen); - if (olen == 0) return false;; + if (olen == 0) return -1;; paGetToken(option + olen + 1, &value, &vlen); if (vlen == 0) @@ -324,18 +324,18 @@ bool taosCfgDynamicOptions(char *msg) { uError("monitor can't be updated, for monitor not initialized"); } } - return true; + return 0; } if (strncasecmp(cfg->option, "debugFlag", olen) == 0) { taosSetAllDebugFlag(); } - return true; + return 0; } if (strncasecmp(option, "resetlog", 8) == 0) { taosResetLog(); taosPrintGlobalCfg(); - return true; + return 0; } if (strncasecmp(option, "resetQueryCache", 15) == 0) { diff --git a/source/dnode/mgmt/inc/dnodeDnode.h b/source/dnode/mgmt/inc/dnodeDnode.h index aba80752a2..9a2b564bd4 100644 --- a/source/dnode/mgmt/inc/dnodeDnode.h +++ b/source/dnode/mgmt/inc/dnodeDnode.h @@ -23,23 +23,14 @@ extern "C" { int32_t dnodeInitDnode(); void dnodeCleanupDnode(); -void dnodeProcessStatusRsp(SRpcMsg *pMsg); -void dnodeProcessStartupReq(SRpcMsg *pMsg); -void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); +void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); -int32_t dnodeInitConfig(); -void dnodeCleanupConfig(); - -void dnodeUpdateCfg(SDnodeCfg *data); -void dnodeUpdateDnodeEps(SDnodeEps *data); -void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet); int32_t dnodeGetDnodeId(); int64_t dnodeGetClusterId(); -void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); - -void dnodeGetEpSetForPeer(SRpcEpSet *epSet); -void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); - +void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); +void dnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet); +void dnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h index bccafcee85..1306f2d700 100644 --- a/source/dnode/mgmt/inc/dnodeInt.h +++ b/source/dnode/mgmt/inc/dnodeInt.h @@ -35,6 +35,7 @@ extern int32_t dDebugFlag; #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; +typedef void (*MsgFp)(SRpcMsg *pMsg, SRpcEpSet *pEpSet); int32_t dnodeInit(); void dnodeCleanup(); diff --git a/source/dnode/mgmt/inc/dnodeMnode.h b/source/dnode/mgmt/inc/dnodeMnode.h index 00868535c1..e696acf425 100644 --- a/source/dnode/mgmt/inc/dnodeMnode.h +++ b/source/dnode/mgmt/inc/dnodeMnode.h @@ -23,6 +23,7 @@ extern "C" { int32_t dnodeInitMnode(); void dnodeCleanupMnode(); +void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/inc/dnodeVnodes.h index fae1de7217..46dcf549d4 100644 --- a/source/dnode/mgmt/inc/dnodeVnodes.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -23,6 +23,8 @@ extern "C" { int32_t dnodeInitVnodes(); void dnodeCleanupVnodes(); +void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); +void dnodeGetVnodes(SVnodeLoads *pVloads); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 722ef30099..64694e9a46 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -16,70 +16,83 @@ #define _DEFAULT_SOURCE #include "dnodeDnode.h" #include "dnodeTransport.h" -#include "tthread.h" -#include "ttime.h" +#include "dnodeVnodes.h" #include "cJSON.h" #include "thash.h" +#include "tthread.h" +#include "ttime.h" static struct { int32_t dnodeId; - int32_t dropped; int64_t clusterId; SDnodeEps *dnodeEps; SHashObj *dnodeHash; SRpcEpSet mnodeEpSetForShell; SRpcEpSet mnodeEpSetForPeer; char file[PATH_MAX + 20]; + uint32_t rebootTime; + int8_t dropped; + int8_t threadStop; + pthread_t *threadId; pthread_mutex_t mutex; -} tsConfig; + MsgFp msgFp[TSDB_MSG_TYPE_MAX]; +} tsDnode = {0}; -void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { - pthread_mutex_lock(&tsConfig.mutex); - *epSet = tsConfig.mnodeEpSetForPeer; - pthread_mutex_unlock(&tsConfig.mutex); +int32_t dnodeGetDnodeId() { + int32_t dnodeId = 0; + pthread_mutex_lock(&tsDnode.mutex); + dnodeId = tsDnode.dnodeId; + pthread_mutex_unlock(&tsDnode.mutex); + return dnodeId; } -static void dnodeGetEpSetForShell(SRpcEpSet *epSet) { - pthread_mutex_lock(&tsConfig.mutex); - *epSet = tsConfig.mnodeEpSetForShell; - pthread_mutex_unlock(&tsConfig.mutex); +int64_t dnodeGetClusterId() { + int64_t clusterId = 0; + pthread_mutex_lock(&tsDnode.mutex); + clusterId = tsDnode.clusterId; + pthread_mutex_unlock(&tsDnode.mutex); + return clusterId; } -void dnodeUpdateMnodeEps(SRpcEpSet *ep) { - if (ep != NULL || ep->numOfEps <= 0) { - dError("mnode is changed, but content is invalid, discard it"); - return; - } +void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) { + pthread_mutex_lock(&tsDnode.mutex); - pthread_mutex_lock(&tsConfig.mutex); + SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t)); + if (pEp != NULL) { + if (port) *port = pEp->dnodePort; + if (fqdn) tstrncpy(fqdn, pEp->dnodeFqdn, TSDB_FQDN_LEN); + if (ep) snprintf(ep, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort); + } - dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse); + pthread_mutex_unlock(&tsDnode.mutex); +} - tsConfig.mnodeEpSetForPeer = *ep; - for (int32_t i = 0; i < ep->numOfEps; ++i) { - ep->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); - } - tsConfig.mnodeEpSetForShell = *ep; +void dnodeGetMnodeEpSetForPeer(SRpcEpSet *pEpSet) { + pthread_mutex_lock(&tsDnode.mutex); + *pEpSet = tsDnode.mnodeEpSetForPeer; + pthread_mutex_unlock(&tsDnode.mutex); +} - pthread_mutex_unlock(&tsConfig.mutex); +void dnodeGetMnodeEpSetForShell(SRpcEpSet *pEpSet) { + pthread_mutex_lock(&tsDnode.mutex); + *pEpSet = tsDnode.mnodeEpSetForShell; + pthread_mutex_unlock(&tsDnode.mutex); } -void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { - SRpcConnInfo connInfo = {0}; - rpcGetConnInfo(rpcMsg->handle, &connInfo); +void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) { + int32_t msgType = pMsg->msgType; SRpcEpSet epSet = {0}; if (forShell) { - dnodeGetEpSetForShell(&epSet); + dnodeGetMnodeEpSetForShell(&epSet); } else { - dnodeGetEpSetForPeer(&epSet); + dnodeGetMnodeEpSetForPeer(&epSet); } - dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse); + dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); + dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) { if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) || (epSet.port[i] == tsServerPort && forShell)) { @@ -91,71 +104,88 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { epSet.port[i] = htons(epSet.port[i]); } - rpcSendRedirectRsp(rpcMsg->handle, &epSet); + rpcSendRedirectRsp(pMsg->handle, &epSet); +} + +static void dnodeUpdateMnodeEpSet(SRpcEpSet *pEpSet) { + if (pEpSet == NULL || pEpSet->numOfEps <= 0) { + dError("mnode is changed, but content is invalid, discard it"); + return; + } else { + dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); + } + + pthread_mutex_lock(&tsDnode.mutex); + + tsDnode.mnodeEpSetForPeer = *pEpSet; + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { + pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; + dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); + } + tsDnode.mnodeEpSetForShell = *pEpSet; + + pthread_mutex_unlock(&tsDnode.mutex); } static void dnodePrintEps() { - dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum); - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + dDebug("print dnode endpoint list, num:%d", tsDnode.dnodeEps->dnodeNum); + for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode); } } -static void dnodeResetEps(SDnodeEps *data) { - assert(data != NULL); - - int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp); +static void dnodeResetEps(SDnodeEps *pEps) { + assert(pEps != NULL); + int32_t size = sizeof(SDnodeEps) + pEps->dnodeNum * sizeof(SDnodeEp); - if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) { + if (pEps->dnodeNum > tsDnode.dnodeEps->dnodeNum) { SDnodeEps *tmp = calloc(1, size); if (tmp == NULL) return; - tfree(tsConfig.dnodeEps); - tsConfig.dnodeEps = tmp; + tfree(tsDnode.dnodeEps); + tsDnode.dnodeEps = tmp; } - if (tsConfig.dnodeEps != data) { - memcpy(tsConfig.dnodeEps, data, size); + if (tsDnode.dnodeEps != pEps) { + memcpy(tsDnode.dnodeEps, pEps, size); } - tsConfig.mnodeEpSetForPeer.inUse = 0; - tsConfig.mnodeEpSetForShell.inUse = 0; - int32_t index = 0; - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + tsDnode.mnodeEpSetForPeer.inUse = 0; + tsDnode.mnodeEpSetForShell.inUse = 0; + + int32_t mIndex = 0; + for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; if (!ep->isMnode) continue; - if (index >= TSDB_MAX_REPLICA) continue; - strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn); - strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn); - tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort; - tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort; - index++; + if (mIndex >= TSDB_MAX_REPLICA) continue; + strcpy(tsDnode.mnodeEpSetForShell.fqdn[mIndex], ep->dnodeFqdn); + strcpy(tsDnode.mnodeEpSetForPeer.fqdn[mIndex], ep->dnodeFqdn); + tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort; + tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort + tsDnodeDnodePort; + mIndex++; } - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; - taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); + for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; + taosHashPut(tsDnode.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); } dnodePrintEps(); } -static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { +static bool dnodeIsEpChanged(int32_t dnodeId, char *epStr) { bool changed = false; + pthread_mutex_lock(&tsDnode.mutex); - pthread_mutex_lock(&tsConfig.mutex); - - SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t)); - if (ep != NULL) { + SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t)); + if (pEp != NULL) { char epSaved[TSDB_EP_LEN + 1]; - snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); - changed = strcmp(epstr, epSaved) != 0; - tstrncpy(epstr, epSaved, TSDB_EP_LEN); + snprintf(epSaved, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort); + changed = strcmp(epStr, epSaved) != 0; } - pthread_mutex_unlock(&tsConfig.mutex); - + pthread_mutex_unlock(&tsDnode.mutex); return changed; } @@ -166,101 +196,101 @@ static int32_t dnodeReadEps() { cJSON *root = NULL; FILE *fp = NULL; - fp = fopen(tsConfig.file, "r"); + fp = fopen(tsDnode.file, "r"); if (!fp) { - dDebug("file %s not exist", tsConfig.file); + dDebug("file %s not exist", tsDnode.file); goto PRASE_EPS_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", tsConfig.file); + dError("failed to read %s since content is null", tsDnode.file); goto PRASE_EPS_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", tsConfig.file); + dError("failed to read %s since invalid json format", tsDnode.file); goto PRASE_EPS_OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s since dnodeId not found", tsConfig.file); + dError("failed to read %s since dnodeId not found", tsDnode.file); goto PRASE_EPS_OVER; } - tsConfig.dnodeId = atoi(dnodeId->valuestring); + tsDnode.dnodeId = atoi(dnodeId->valuestring); - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_String) { - dError("failed to read %s since dropped not found", tsConfig.file); + cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); + if (!clusterId || clusterId->type != cJSON_String) { + dError("failed to read %s since clusterId not found", tsDnode.file); goto PRASE_EPS_OVER; } - tsConfig.dropped = atoi(dropped->valuestring); + tsDnode.clusterId = atoll(clusterId->valuestring); - cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); - if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", tsConfig.file); + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_String) { + dError("failed to read %s since dropped not found", tsDnode.file); goto PRASE_EPS_OVER; } - tsConfig.clusterId = atoll(clusterId->valuestring); + tsDnode.dropped = atoi(dropped->valuestring); cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", tsConfig.file); + dError("failed to read %s since dnodeInfos not found", tsDnode.file); goto PRASE_EPS_OVER; } int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); if (dnodeInfosSize <= 0) { - dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize); + dError("failed to read %s since dnodeInfos size:%d invalid", tsDnode.file, dnodeInfosSize); goto PRASE_EPS_OVER; } - tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); - if (tsConfig.dnodeEps == NULL) { + tsDnode.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + if (tsDnode.dnodeEps == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); goto PRASE_EPS_OVER; } - tsConfig.dnodeEps->dnodeNum = dnodeInfosSize; + tsDnode.dnodeEps->dnodeNum = dnodeInfosSize; for (int32_t i = 0; i < dnodeInfosSize; ++i) { cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); if (dnodeInfo == NULL) break; - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + SDnodeEp *pEp = &tsDnode.dnodeEps->dnodeEps[i]; cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s, dnodeId not found", tsConfig.file); + dError("failed to read %s, dnodeId not found", tsDnode.file); goto PRASE_EPS_OVER; } - ep->dnodeId = atoi(dnodeId->valuestring); + pEp->dnodeId = atoi(dnodeId->valuestring); cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); if (!isMnode || isMnode->type != cJSON_String) { - dError("failed to read %s, isMnode not found", tsConfig.file); + dError("failed to read %s, isMnode not found", tsDnode.file); goto PRASE_EPS_OVER; } - ep->isMnode = atoi(isMnode->valuestring); + pEp->isMnode = atoi(isMnode->valuestring); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", tsConfig.file); + dError("failed to read %s, dnodeFqdn not found", tsDnode.file); goto PRASE_EPS_OVER; } - tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); + tstrncpy(pEp->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); if (!dnodePort || dnodePort->type != cJSON_String) { - dError("failed to read %s, dnodePort not found", tsConfig.file); + dError("failed to read %s, dnodePort not found", tsDnode.file); goto PRASE_EPS_OVER; } - ep->dnodePort = atoi(dnodePort->valuestring); + pEp->dnodePort = atoi(dnodePort->valuestring); } - dInfo("succcessed to read file %s", tsConfig.file); + dInfo("succcessed to read file %s", tsDnode.file); dnodePrintEps(); PRASE_EPS_OVER: @@ -268,21 +298,21 @@ PRASE_EPS_OVER: if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); - if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) { - dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp); + if (dnodeIsEpChanged(tsDnode.dnodeId, tsLocalEp)) { + dError("localEp %s different with %s and need reconfigured", tsLocalEp, tsDnode.file); return -1; } - dnodeResetEps(tsConfig.dnodeEps); + dnodeResetEps(tsDnode.dnodeEps); terrno = 0; return 0; } static int32_t dnodeWriteEps() { - FILE *fp = fopen(tsConfig.file, "w"); + FILE *fp = fopen(tsDnode.file, "w"); if (!fp) { - dError("failed to write %s since %s", tsConfig.file, strerror(errno)); + dError("failed to write %s since %s", tsDnode.file, strerror(errno)); return -1; } @@ -291,17 +321,17 @@ static int32_t dnodeWriteEps() { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsConfig.dnodeId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsConfig.dropped); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsDnode.dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsDnode.clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsDnode.dropped); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); - for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) { - SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i]; + for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId); len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode); len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort); - if (i < tsConfig.dnodeEps->dnodeNum - 1) { + if (i < tsDnode.dnodeEps->dnodeNum - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); @@ -315,150 +345,76 @@ static int32_t dnodeWriteEps() { free(content); terrno = 0; - dInfo("successed to write %s", tsConfig.file); + dInfo("successed to write %s", tsDnode.file); return 0; } -int32_t dnodeInitConfig() { - tsConfig.dnodeId = 0; - tsConfig.dropped = 0; - tsConfig.clusterId = 0; - tsConfig.dnodeEps = NULL; - snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir); - pthread_mutex_init(&tsConfig.mutex, NULL); - - tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (tsConfig.dnodeHash == NULL) return -1; - - int32_t ret = dnodeReadEps(); - if (ret == 0) { - dInfo("dnode eps is initialized"); - } - - return ret; -} - -void dnodeCleanupConfig() { - pthread_mutex_lock(&tsConfig.mutex); - - if (tsConfig.dnodeEps != NULL) { - free(tsConfig.dnodeEps); - tsConfig.dnodeEps = NULL; - } - - if (tsConfig.dnodeHash) { - taosHashCleanup(tsConfig.dnodeHash); - tsConfig.dnodeHash = NULL; - } - - pthread_mutex_unlock(&tsConfig.mutex); - pthread_mutex_destroy(&tsConfig.mutex); -} - -void dnodeUpdateDnodeEps(SDnodeEps *data) { - if (data == NULL || data->dnodeNum <= 0) return; - - pthread_mutex_lock(&tsConfig.mutex); - - if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) { - dnodeResetEps(data); - dnodeWriteEps(); - } else { - int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); - if (memcmp(tsConfig.dnodeEps, data, size) != 0) { - dnodeResetEps(data); - dnodeWriteEps(); - } - } - - pthread_mutex_unlock(&tsConfig.mutex); -} - -void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { - pthread_mutex_lock(&tsConfig.mutex); - - SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t)); - if (ep != NULL) { - if (port) *port = ep->dnodePort; - if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); - if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); - } - - pthread_mutex_unlock(&tsConfig.mutex); -} - -void dnodeUpdateCfg(SDnodeCfg *data) { - if (tsConfig.dnodeId != 0 && !data->dropped) return; - - pthread_mutex_lock(&tsConfig.mutex); - - tsConfig.dnodeId = data->dnodeId; - tsConfig.clusterId = data->clusterId; - tsConfig.dropped = data->dropped; - dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId); - - dnodeWriteEps(); - pthread_mutex_unlock(&tsConfig.mutex); -} - -int32_t dnodeGetDnodeId() { - int32_t dnodeId = 0; - pthread_mutex_lock(&tsConfig.mutex); - dnodeId = tsConfig.dnodeId; - pthread_mutex_unlock(&tsConfig.mutex); - return dnodeId; -} - -int64_t dnodeGetClusterId() { - int64_t clusterId = 0; - pthread_mutex_lock(&tsConfig.mutex); - clusterId = tsConfig.clusterId; - pthread_mutex_unlock(&tsConfig.mutex); - return clusterId; -} - -static struct { - pthread_t *threadId; - bool threadStop; - uint32_t rebootTime; -} tsDnode; - static void dnodeSendStatusMsg() { - int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SStatusMsg *pStatus = rpcMallocCont(contLen); if (pStatus == NULL) { dError("failed to malloc status message"); return; } - pStatus->version = htonl(tsVersion); + pStatus->sversion = htonl(tsVersion); pStatus->dnodeId = htonl(dnodeGetDnodeId()); - tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); pStatus->clusterId = htobe64(dnodeGetClusterId()); - pStatus->lastReboot = htonl(tsDnode.rebootTime); + pStatus->rebootTime = htonl(tsDnode.rebootTime); pStatus->numOfCores = htonl(tsNumOfCores); - pStatus->diskAvailable = tsAvailDataDirGB; + tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); - // fill cluster cfg parameters pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval); pStatus->clusterCfg.checkTime = 0; - tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64); - char timestr[32] = "1970-01-01 00:00:00.00"; - (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, TSDB_TIMEZONE_LEN); tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN); tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN); + char timestr[32] = "1970-01-01 00:00:00.00"; + (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - // vnodeGetStatus(NULL, pStatus); - // contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); - // pStatus->openVnodes = htons(pStatus->openVnodes); - - SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; + dnodeGetVnodes(&pStatus->vnodeLoads); + contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.vnodeNum * sizeof(SVnodeLoad); + SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; dnodeSendMsgToMnode(&rpcMsg); } -void dnodeProcessStatusRsp(SRpcMsg *pMsg) { - dTrace("status rsp is received, code:%s", tstrerror(pMsg->code)); +static void dnodeUpdateCfg(SDnodeCfg *pCfg) { + if (tsDnode.dnodeId == 0) return; + if (tsDnode.dropped) return; + + pthread_mutex_lock(&tsDnode.mutex); + + tsDnode.dnodeId = pCfg->dnodeId; + tsDnode.clusterId = pCfg->clusterId; + tsDnode.dropped = pCfg->dropped; + dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, pCfg->dnodeId, pCfg->clusterId); + + dnodeWriteEps(); + pthread_mutex_unlock(&tsDnode.mutex); +} + +static void dnodeUpdateDnodeEps(SDnodeEps *pEps) { + if (pEps == NULL || pEps->dnodeNum <= 0) return; + + pthread_mutex_lock(&tsDnode.mutex); + + if (pEps->dnodeNum != tsDnode.dnodeEps->dnodeNum) { + dnodeResetEps(pEps); + dnodeWriteEps(); + } else { + int32_t size = pEps->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); + if (memcmp(tsDnode.dnodeEps, pEps, size) != 0) { + dnodeResetEps(pEps); + dnodeWriteEps(); + } + } + + pthread_mutex_unlock(&tsDnode.mutex); +} + +static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { if (pMsg->code != TSDB_CODE_SUCCESS) return; SStatusRsp *pStatusRsp = pMsg->pCont; @@ -466,25 +422,40 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) { SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->clusterId = htobe64(pCfg->clusterId); - pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); - pCfg->numOfDnodes = htonl(pCfg->numOfDnodes); dnodeUpdateCfg(pCfg); - if (pCfg->dropped) { - dError("status rsp is received, and set dnode to drop status"); - return; + if (pCfg->dropped) return; + + SDnodeEps *pEps = &pStatusRsp->dnodeEps; + pEps->dnodeNum = htonl(pEps->dnodeNum); + for (int32_t i = 0; i < pEps->dnodeNum; ++i) { + pEps->dnodeEps[i].dnodeId = htonl(pEps->dnodeEps[i].dnodeId); + pEps->dnodeEps[i].dnodePort = htons(pEps->dnodeEps[i].dnodePort); } - // vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); + dnodeUpdateDnodeEps(pEps); +} - SDnodeEps *eps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess)); - eps->dnodeNum = htonl(eps->dnodeNum); - for (int32_t i = 0; i < eps->dnodeNum; ++i) { - eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId); - eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort); - } +static void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { + SCfgDnodeMsg *pCfg = pMsg->pCont; - dnodeUpdateDnodeEps(eps); + int32_t code = taosCfgDynamicOptions(pCfg->config); + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} + +static void dnodeProcessStartupReq(SRpcMsg *pMsg) { + dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); + + SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); + dnodeGetStartup(pStep); + + dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); + + SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); } static void *dnodeThreadRoutine(void *param) { @@ -496,14 +467,34 @@ static void *dnodeThreadRoutine(void *param) { } int32_t dnodeInitDnode() { - tsDnode.threadStop = false; + tsDnode.dnodeId = 0; + tsDnode.clusterId = 0; + tsDnode.dnodeEps = NULL; + snprintf(tsDnode.file, sizeof(tsDnode.file), "%s/dnode.json", tsDnodeDir); tsDnode.rebootTime = taosGetTimestampSec(); + tsDnode.dropped = 0; + pthread_mutex_init(&tsDnode.mutex, NULL); + tsDnode.threadStop = false; + + tsDnode.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsDnode.dnodeHash == NULL) { + dError("failed to init dnode hash"); + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL); if (tsDnode.threadId == NULL) { - return -1; + dError("failed to init dnode thread"); + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + + int32_t code = dnodeReadEps(); + if (code != 0) { + dError("failed to read dnode endpoint file since %s", tstrerror(code)); + return code; } - dInfo("dnode msg is initialized"); + dInfo("dnode-dnode is initialized"); return 0; } @@ -514,29 +505,45 @@ void dnodeCleanupDnode() { tsDnode.threadId = NULL; } - dInfo("dnode msg is cleanuped"); -} + pthread_mutex_lock(&tsDnode.mutex); -void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { - SCfgDnodeMsg *pCfg = pMsg->pCont; + if (tsDnode.dnodeEps != NULL) { + free(tsDnode.dnodeEps); + tsDnode.dnodeEps = NULL; + } - int32_t code = taosCfgDynamicOptions(pCfg->config); + if (tsDnode.dnodeHash) { + taosHashCleanup(tsDnode.dnodeHash); + tsDnode.dnodeHash = NULL; + } - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_destroy(&tsDnode.mutex); - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + dInfo("dnode-dnode is cleaned up"); } -void dnodeProcessStartupReq(SRpcMsg *pMsg) { - dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); - - SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); - dnodeGetStartup(pStep); - - dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); - - SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)}; - rpcSendResponse(&rpcRsp); - rpcFreeCont(pMsg->pCont); +void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + int32_t msgType = pMsg->msgType; + + if (msgType == TSDB_MSG_TYPE_STATUS_RSP && pEpSet) { + dnodeUpdateMnodeEpSet(pEpSet); + } + + switch (msgType) { + case TSDB_MSG_TYPE_NETWORK_TEST: + dnodeProcessStartupReq(pMsg); + break; + case TSDB_MSG_TYPE_CONFIG_DNODE_IN: + dnodeProcessConfigDnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_STATUS_RSP: + dnodeProcessStatusRsp(pMsg); + break; + default: + dError("RPC %p, %s not processed", pMsg->handle, taosMsg[msgType]); + SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_MSG_NOT_PROCESSED}; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + } } diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index 6019fbd1f6..6346b3386f 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -21,7 +21,7 @@ int32_t dnodeInitMnode() { SMnodePara para; - para.fp.GetDnodeEp = dnodeGetEp; + para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendRedirectMsg = dnodeSendRedirectMsg; @@ -59,4 +59,11 @@ void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); +} + +void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + mnodeProcessMsg(pMsg); + // tsDnode.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq; + + // tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessDropMnodeReq; } \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index 39747d4350..222d59ba55 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -26,9 +26,6 @@ #include "dnodeVnodes.h" #include "mnode.h" #include "vnode.h" - -typedef void (*MsgFp)(SRpcMsg *pMsg); - static struct { void *serverRpc; void *clientRpc; @@ -38,88 +35,88 @@ static struct { static void dnodeInitMsgFp() { // msg from client to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; + tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg; // msg from client to mnode - tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg; // message from mnode to dnode - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = vnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq; - tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = NULL; - tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = NULL; - tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = mnodeProcessMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dnodeProcessDnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dnodeProcessMnodeMsg; // message from dnode to mnode - tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = NULL; - tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = NULL; - tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; - tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dnodeProcessDnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_GRANT] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dnodeProcessDnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_STATUS] = dnodeProcessMnodeMsg; + tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg; } static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { @@ -127,7 +124,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - dnodeProcessStartupReq(pMsg); + dnodeProcessDnodeMsg(pMsg, pEpSet); return; } @@ -148,7 +145,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { MsgFp fp = tsTrans.msgFp[msgType]; if (fp != NULL) { dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]); - (*fp)(pMsg); + (*fp)(pMsg, pEpSet); } else { dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]); rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; @@ -196,14 +193,10 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { return; } - if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeEps(pEpSet); - } - MsgFp fp = tsTrans.msgFp[msgType]; if (fp != NULL) { - dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); - (*fp)(pMsg); + dTrace("RPC %p, peer rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code)); + (*fp)(pMsg, pEpSet); } else { dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); } @@ -270,7 +263,7 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { MsgFp fp = tsTrans.msgFp[msgType]; if (fp != NULL) { dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]); - (*fp)(pMsg); + (*fp)(pMsg, pEpSet); } else { dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]); rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; @@ -283,13 +276,13 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsT void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { SRpcEpSet epSet = {0}; - dnodeGetEpSetForPeer(&epSet); + dnodeGetMnodeEpSetForPeer(&epSet); dnodeSendMsgToDnode(&epSet, rpcMsg); } static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; - dnodeGetEpSetForPeer(&epSet); + dnodeGetMnodeEpSetForPeer(&epSet); rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); } @@ -303,7 +296,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pMsg; rpcMsg.contLen = sizeof(SAuthMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH; + rpcMsg.msgType = TSDB_MSG_TYPE_AUTH; dDebug("user:%s, send auth msg to mnodes", user); SRpcMsg rpcRsp = {0}; diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 765bd6fc22..f9032b419e 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -19,4 +19,8 @@ int32_t dnodeInitVnodes() { return vnodeInit(); } -void dnodeCleanupVnodes() { vnodeCleanup(); } \ No newline at end of file +void dnodeCleanupVnodes() { vnodeCleanup(); } + +void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } + +void dnodeGetVnodes(SVnodeLoads *pVloads) {} \ No newline at end of file diff --git a/source/dnode/mnode/inc/mnodeDef.h b/source/dnode/mnode/inc/mnodeDef.h index 2670432594..33606e8ee2 100644 --- a/source/dnode/mnode/inc/mnodeDef.h +++ b/source/dnode/mnode/inc/mnodeDef.h @@ -120,7 +120,7 @@ typedef struct SDnodeObj { int64_t createdTime; int64_t updateTime; int64_t lastAccess; - int64_t lastReboot; // time stamp for last reboot + int64_t rebootTime; // time stamp for last reboot char fqdn[TSDB_FQDN_LEN]; char ep[TSDB_EP_LEN]; uint16_t port; diff --git a/source/dnode/mnode/src/mnodeWorker.c b/source/dnode/mnode/src/mnodeWorker.c index a477a11f7c..5ac44a3a06 100644 --- a/source/dnode/mnode/src/mnodeWorker.c +++ b/source/dnode/mnode/src/mnodeWorker.c @@ -171,12 +171,12 @@ static void mnodeInitMsgFp() { // tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessTableCfgMsg; // tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeDispatchToPeerQueue; // tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessVnodeCfgMsg; -// tsMworker.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeDispatchToPeerQueue; -// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessAuthMsg; -// // tsMworker.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeDispatchToPeerQueue; -// // tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_GRANT] = grantProcessMsgInMgmt; -// tsMworker.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeDispatchToPeerQueue; -// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessDnodeStatusMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_AUTH] = mnodeDispatchToPeerQueue; +// tsMworker.peerReqFp[TSDB_MSG_TYPE_AUTH] = mnodeProcessAuthMsg; +// // tsMworker.msgFp[TSDB_MSG_TYPE_GRANT] = mnodeDispatchToPeerQueue; +// // tsMworker.peerReqFp[TSDB_MSG_TYPE_GRANT] = grantProcessMsgInMgmt; +// tsMworker.msgFp[TSDB_MSG_TYPE_STATUS] = mnodeDispatchToPeerQueue; +// tsMworker.peerReqFp[TSDB_MSG_TYPE_STATUS] = mnodeProcessDnodeStatusMsg; // // peer rsp // tsMworker.msgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeDispatchToPeerRspQueue; diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index eecde288b1..e047964b94 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -406,7 +406,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64 if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_SHOW_RETRIEVE || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_STABLE_VGROUP || type == TSDB_MSG_TYPE_TABLES_META || type == TSDB_MSG_TYPE_TABLE_META - || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE) + || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE) pContext->connType = RPC_CONN_TCPC; pContext->rid = taosAddRef(tsRpcRefId, pContext); -- GitLab