From 7bcfb460ce4f721897ba59ce5fb3122cc4855dbb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 20 Oct 2021 09:06:08 +0800 Subject: [PATCH] minor changes --- include/server/mnode/mnode.h | 2 +- include/util/tworker.h | 2 + source/server/dnode/src/dnodeMain.c | 2 +- source/server/dnode/src/dnodeTelem.c | 2 +- source/server/mnode/inc/mnodeInt.h | 2 +- source/server/mnode/inc/mnodeMnode.h | 2 + source/server/mnode/inc/mnodeSync.h | 2 + source/server/mnode/inc/mnodeWorker.h | 3 + source/server/mnode/src/mnodeMnode.c | 5 +- source/server/mnode/src/mnodeSync.c | 4 +- source/server/mnode/src/mnodeWorker.c | 472 +++++++++++++++++++++++++- source/server/mnode/src/mondeInt.c | 4 +- source/util/src/tworker.c | 2 +- 13 files changed, 491 insertions(+), 13 deletions(-) diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index 9eedb62b60..dab2b1e4ae 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -84,7 +84,7 @@ void mnodeUnDeploy(); * * @return Server status. */ -EMnStatus mnodeIsServing(); +EMnStatus mnodeGetStatus(); typedef struct { int64_t numOfDnode; diff --git a/include/util/tworker.h b/include/util/tworker.h index 367c1a24b9..47ad0bd9e7 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -16,6 +16,8 @@ #ifndef _TD_UTIL_WORKER_H #define _TD_UTIL_WORKER_H +#include "tqueue.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c index 17f5c1f851..bd8dbe5994 100644 --- a/source/server/dnode/src/dnodeMain.c +++ b/source/server/dnode/src/dnodeMain.c @@ -235,7 +235,7 @@ static int32_t dnodeStartMnode(SRpcMsg *pMsg) { dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp); } - if (mnodeIsServing()) return 0; + if (mnodeGetStatus() == MN_STATUS_READY) return 0; return mnodeDeploy(); } diff --git a/source/server/dnode/src/dnodeTelem.c b/source/server/dnode/src/dnodeTelem.c index 7c87ea5f50..927a91b277 100644 --- a/source/server/dnode/src/dnodeTelem.c +++ b/source/server/dnode/src/dnodeTelem.c @@ -257,7 +257,7 @@ static void* dnodeTelemThreadFp(void* param) { if (r == 0) break; if (r != ETIMEDOUT) continue; - if (mnodeIsServing()) { + if (mnodeGetStatus() == MN_STATUS_READY) { dnodeSendTelemetryReport(); } end.tv_sec += REPORT_INTERVAL; diff --git a/source/server/mnode/inc/mnodeInt.h b/source/server/mnode/inc/mnodeInt.h index 913e39ea5a..654822ce40 100644 --- a/source/server/mnode/inc/mnodeInt.h +++ b/source/server/mnode/inc/mnodeInt.h @@ -25,7 +25,7 @@ extern "C" { tmr_h mnodeGetTimer(); int32_t mnodeGetDnodeId(); char *mnodeGetClusterId(); -EMnStatus mnodeIsServing(); +EMnStatus mnodeGetStatus(); void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); diff --git a/source/server/mnode/inc/mnodeMnode.h b/source/server/mnode/inc/mnodeMnode.h index 445eed0481..cee96c7bf6 100644 --- a/source/server/mnode/inc/mnodeMnode.h +++ b/source/server/mnode/inc/mnodeMnode.h @@ -24,6 +24,8 @@ extern "C" { int32_t mnodeInitMnode(); void mnodeCleanupMnode(); +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect); +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect); #ifdef __cplusplus } diff --git a/source/server/mnode/inc/mnodeSync.h b/source/server/mnode/inc/mnodeSync.h index 217cdfe1ae..714d6ed1a8 100644 --- a/source/server/mnode/inc/mnodeSync.h +++ b/source/server/mnode/inc/mnodeSync.h @@ -25,6 +25,8 @@ extern "C" { int32_t mnodeInitSync(); void mnodeCleanUpSync(); +bool mnodeIsMaster(); + #ifdef __cplusplus } #endif diff --git a/source/server/mnode/inc/mnodeWorker.h b/source/server/mnode/inc/mnodeWorker.h index 8a4dfec8b2..7663df6559 100644 --- a/source/server/mnode/inc/mnodeWorker.h +++ b/source/server/mnode/inc/mnodeWorker.h @@ -24,6 +24,9 @@ extern "C" { int32_t mnodeInitWorker(); void mnodeCleanupWorker(); +void mnodeProcessMsg(SRpcMsg *rpcMsg); +void mnodeSendRsp(SMnMsg *pMsg, int32_t code); +void mnodeReDispatchToWriteQueue(SMnMsg *pMsg); #ifdef __cplusplus } diff --git a/source/server/mnode/src/mnodeMnode.c b/source/server/mnode/src/mnodeMnode.c index ce3025315d..d2bcd25fc7 100644 --- a/source/server/mnode/src/mnodeMnode.c +++ b/source/server/mnode/src/mnodeMnode.c @@ -18,4 +18,7 @@ #include "mnodeInt.h" int32_t mnodeInitMnode() { return 0; } -void mnodeCleanupMnode() {} \ No newline at end of file +void mnodeCleanupMnode() {} + +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect) {} +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {} \ No newline at end of file diff --git a/source/server/mnode/src/mnodeSync.c b/source/server/mnode/src/mnodeSync.c index d1c64e2d13..c161bb971a 100644 --- a/source/server/mnode/src/mnodeSync.c +++ b/source/server/mnode/src/mnodeSync.c @@ -18,4 +18,6 @@ #include "mnodeInt.h" int32_t mnodeInitSync() { return 0; } -void mnodeCleanUpSync() {} \ No newline at end of file +void mnodeCleanUpSync() {} + +bool mnodeIsMaster() { return true; } \ No newline at end of file diff --git a/source/server/mnode/src/mnodeWorker.c b/source/server/mnode/src/mnodeWorker.c index bcf24a0aae..84beddb2da 100644 --- a/source/server/mnode/src/mnodeWorker.c +++ b/source/server/mnode/src/mnodeWorker.c @@ -15,7 +15,473 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "mnodeInt.h" +#include "tworker.h" +#include "tglobal.h" +#include "mnodeMnode.h" +#include "mnodeSdb.h" +#include "mnodeShow.h" +#include "mnodeSync.h" +#include "mnodeWorker.h" -int32_t mnodeInitWorker() { return 0; } -void mnodeCleanupWorker() {} \ No newline at end of file +static struct { + SWorkerPool read; + SWorkerPool write; + SWorkerPool peerReq; + SWorkerPool peerRsp; + taos_queue readQ; + taos_queue writeQ; + taos_queue peerReqQ; + taos_queue peerRspQ; + int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); + int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); + int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *); + void (*peerRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); + void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); +} tsMworker = {0}; + +static SMnMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { + int32_t size = sizeof(SMnMsg) + pRpcMsg->contLen; + SMnMsg *pMsg = taosAllocateQitem(size); + + pMsg->rpcMsg = *pRpcMsg; + pMsg->rpcMsg.pCont = pMsg->pCont; + pMsg->createdTime = taosGetTimestampSec(); + memcpy(pMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); + + SRpcConnInfo connInfo = {0}; + if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) == 0) { + pMsg->pUser = sdbGetRow(MN_SDB_USER, connInfo.user); + } + + if (pMsg->pUser == NULL) { + mError("can not get user from conn:%p", pMsg->rpcMsg.handle); + taosFreeQitem(pMsg); + return NULL; + } + + return pMsg; +} + +static void mnodeCleanupMsg(SMnMsg *pMsg) { + if (pMsg == NULL) return; + if (pMsg->rpcMsg.pCont != pMsg->pCont) { + tfree(pMsg->rpcMsg.pCont); + } + + taosFreeQitem(pMsg); +} + +static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) { + if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) { + mnodeSendRedirectMsg(pRpcMsg, true); + } else { + SMnMsg *pMsg = mnodeInitMsg(pRpcMsg); + if (pMsg == NULL) { + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + rpcSendResponse(&rpcRsp); + } else { + mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); + taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg); + } + } + + rpcFreeCont(pRpcMsg->pCont); +} + +void mnodeReDispatchToWriteQueue(SMnMsg *pMsg) { + if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) { + mnodeSendRedirectMsg(&pMsg->rpcMsg, true); + mnodeCleanupMsg(pMsg); + } else { + taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg); + } +} + +static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) { + if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.readQ == NULL) { + mnodeSendRedirectMsg(pRpcMsg, true); + } else { + SMnMsg *pMsg = mnodeInitMsg(pRpcMsg); + if (pMsg == NULL) { + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + rpcSendResponse(&rpcRsp); + } else { + mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); + taosWriteQitem(tsMworker.readQ, TAOS_QTYPE_RPC, pMsg); + } + } + + rpcFreeCont(pRpcMsg->pCont); +} + +static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) { + if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.peerReqQ == NULL) { + mnodeSendRedirectMsg(pRpcMsg, false); + } else { + SMnMsg *pMsg = mnodeInitMsg(pRpcMsg); + if (pMsg == NULL) { + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + rpcSendResponse(&rpcRsp); + } else { + mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle, + taosMsg[pMsg->rpcMsg.msgType]); + taosWriteQitem(tsMworker.peerReqQ, TAOS_QTYPE_RPC, pMsg); + } + } + + rpcFreeCont(pRpcMsg->pCont); +} + +void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) { + SMnMsg *pMsg = mnodeInitMsg(pRpcMsg); + if (pMsg == NULL) { + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + rpcSendResponse(&rpcRsp); + } else { + mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle, + taosMsg[pMsg->rpcMsg.msgType]); + taosWriteQitem(tsMworker.peerRspQ, TAOS_QTYPE_RPC, pMsg); + } + + // rpcFreeCont(pRpcMsg->pCont); +} + +static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) { + if (pMsg == NULL) return; + if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; + if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { + mnodeReDispatchToWriteQueue(pMsg); + return; + } + + SRpcMsg rpcRsp = { + .handle = pMsg->rpcMsg.handle, + .pCont = pMsg->rpcRsp.rsp, + .contLen = pMsg->rpcRsp.len, + .code = code, + }; + + rpcSendResponse(&rpcRsp); + mnodeCleanupMsg(pMsg); +} + +void mnodeSendRsp(SMnMsg *pMsg, int32_t code) { mnodeSendRpcRsp(NULL, pMsg, 0, code); } + +static void mnodeProcessPeerRspEnd(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) { + mnodeCleanupMsg(pMsg); +} + +static void mnodeInitMsgFp() { +// // peer req +// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue; +// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessTableCfgMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeDispatchToPeerQueue; +// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessVnodeCfgMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeDispatchToPeerQueue; +// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessAuthMsg; +// // tsMworker.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeDispatchToPeerQueue; +// // tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_GRANT] = grantProcessMsgInMgmt; +// tsMworker.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeDispatchToPeerQueue; +// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessDnodeStatusMsg; + +// // peer rsp +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessCfgDnodeMsgRsp; + +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessDropSuperTableRsp; +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessCreateChildTableRsp; +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessDropChildTableRsp; +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessAlterTableRsp; + +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessCreateVnodeRsp; +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessAlterVnodeRsp; +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessCompactVnodeRsp; +// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeDispatchToPeerRspQueue; +// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessDropVnodeRsp; + +// // read msg +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessHeartBeatMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessConnectMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessUseMsg; + +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessTableMetaMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMultiTableMetaMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessSuperTableVgroupMsg; + +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessShowMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessRetrieveMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeDispatchToReadQueue; +// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessRetrieveFuncReq; + +// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeDispatchToWriteQueue; +// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = acctProcessCreateAcctMsg; +// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeDispatchToWriteQueue; +// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = acctProcessDropAcctMsg; +// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeDispatchToWriteQueue; +// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = acctProcessAlterAcctMsg; + +// // write msg +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessCreateUserMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessAlterUserMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessDropUserMsg; + +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessCreateDnodeMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessDropDnodeMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessCfgDnodeMsg; + +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessCreateDbMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessAlterDbMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessDropDbMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessSyncDbMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessCompactMsg; + +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessCreateFuncMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessDropFuncMsg; + +// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeDispatchToWriteQueue; +// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = tpProcessCreateTpMsg; +// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeDispatchToWriteQueue; +// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = tpProcessAlterTpMsg; +// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeDispatchToWriteQueue; +// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = tpProcessDropTpMsg; + +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessCreateTableMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessDropTableMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessAlterTableMsg; + +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = NULL; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessKillQueryMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessKillStreamMsg; +// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeDispatchToWriteQueue; +// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessKillConnectionMsg; +} + +static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) { + int32_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + + if (pMsg->rpcMsg.pCont == NULL) { + mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); + return TSDB_CODE_MND_INVALID_MSG_LEN; + } + + if (!mnodeIsMaster()) { + SMnRsp *rpcRsp = &pMsg->rpcRsp; + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForShell(epSet, true); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); + + mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, + taosMsg[msgType], epSet->numOfEps, epSet->inUse); + + return TSDB_CODE_RPC_REDIRECT; + } + + if (tsMworker.writeMsgFp[msgType] == NULL) { + mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); + return TSDB_CODE_MND_MSG_NOT_PROCESSED; + } + + return (*tsMworker.writeMsgFp[msgType])(pMsg); +} + +static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) { + int32_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + + if (pMsg->rpcMsg.pCont == NULL) { + mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]); + return TSDB_CODE_MND_INVALID_MSG_LEN; + } + + if (!mnodeIsMaster()) { + SMnRsp *rpcRsp = &pMsg->rpcRsp; + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + if (!epSet) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } + mnodeGetMnodeEpSetForShell(epSet, true); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); + + mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], + epSet->numOfEps, epSet->inUse); + return TSDB_CODE_RPC_REDIRECT; + } + + if (tsMworker.readMsgFp[msgType] == NULL) { + mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]); + return TSDB_CODE_MND_MSG_NOT_PROCESSED; + } + + mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]); + return (*tsMworker.readMsgFp[msgType])(pMsg); +} + +static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) { + int32_t msgType = pMsg->rpcMsg.msgType; + void * ahandle = pMsg->rpcMsg.ahandle; + + if (pMsg->rpcMsg.pCont == NULL) { + mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]); + return TSDB_CODE_MND_INVALID_MSG_LEN; + } + + if (!mnodeIsMaster()) { + SMnRsp *rpcRsp = &pMsg->rpcRsp; + SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + mnodeGetMnodeEpSetForPeer(epSet, true); + rpcRsp->rsp = epSet; + rpcRsp->len = sizeof(SRpcEpSet); + + mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, + taosMsg[msgType], epSet->numOfEps, epSet->inUse); + + return TSDB_CODE_RPC_REDIRECT; + } + + if (tsMworker.peerReqFp[msgType] == NULL) { + mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]); + return TSDB_CODE_MND_MSG_NOT_PROCESSED; + } + + return (*tsMworker.peerReqFp[msgType])(pMsg); +} + +static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) { + int32_t msgType = pMsg->rpcMsg.msgType; + SRpcMsg *pRpcMsg = &pMsg->rpcMsg; + + if (!mnodeIsMaster()) { + mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); + return 0; + } + + if (tsMworker.peerRspFp[msgType]) { + (*tsMworker.peerRspFp[msgType])(pRpcMsg); + } else { + mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); + } + + return 0; +} + +int32_t mnodeInitWorker() { + mnodeInitMsgFp(); + + SWorkerPool *pPool = &tsMworker.write; + pPool->name = "mnode-write"; + pPool->startFp = (ProcessStartFp)mnodeProcessWriteReq; + pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } else { + tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL); + } + + pPool = &tsMworker.read; + pPool->name = "mnode-read"; + pPool->startFp = (ProcessStartFp)mnodeProcessReadReq; + pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp; + pPool->min = 2; + pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2); + pPool->max = MAX(2, pPool->max); + pPool->max = MIN(4, pPool->max); + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } else { + tsMworker.readQ = tWorkerAllocQueue(pPool, NULL); + } + + pPool = &tsMworker.peerReq; + pPool->name = "mnode-peer-req"; + pPool->startFp = (ProcessStartFp)mnodeProcessPeerReq; + pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } else { + tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL); + } + + pPool = &tsMworker.peerRsp; + pPool->name = "mnode-peer-rsp"; + pPool->startFp = (ProcessStartFp)mnodeProcessPeerRsp; + pPool->endFp = (ProcessEndFp)mnodeProcessPeerRspEnd; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } else { + tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL); + } + + mInfo("mnode worker is initialized"); + return 0; +} + +void mnodeCleanupWorker() { + tWorkerFreeQueue(&tsMworker.write, tsMworker.writeQ); + tWorkerCleanup(&tsMworker.write); + tsMworker.writeQ = NULL; + + tWorkerFreeQueue(&tsMworker.read, tsMworker.readQ); + tWorkerCleanup(&tsMworker.read); + tsMworker.readQ = NULL; + + tWorkerFreeQueue(&tsMworker.peerReq, tsMworker.peerReqQ); + tWorkerCleanup(&tsMworker.peerReq); + tsMworker.peerReqQ = NULL; + + tWorkerFreeQueue(&tsMworker.peerRsp, tsMworker.peerRspQ); + tWorkerCleanup(&tsMworker.peerRsp); + tsMworker.peerRspQ = NULL; + + mInfo("mnode worker is closed"); +} + +void mnodeProcessMsg(SRpcMsg *pMsg) { + if (tsMworker.msgFp[pMsg->msgType]) { + (*tsMworker.msgFp[pMsg->msgType])(pMsg); + } else { + assert(0); + } +} diff --git a/source/server/mnode/src/mondeInt.c b/source/server/mnode/src/mondeInt.c index 13dba2c944..41cf5f14b3 100644 --- a/source/server/mnode/src/mondeInt.c +++ b/source/server/mnode/src/mondeInt.c @@ -51,7 +51,7 @@ int32_t mnodeGetDnodeId() { return tsMint.dnodeId; } char *mnodeGetClusterId() { return tsMint.clusterId; } -EMnStatus mnodeIsServing() { return tsMint.state; } +EMnStatus mnodeGetStatus() { return tsMint.state; } void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { (*tsMint.fp.SendMsgToDnode)(epSet, rpcMsg); @@ -244,5 +244,3 @@ void mnodeCleanup() { } int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; } - -void mnodeProcessMsg(SRpcMsg *rpcMsg) {} \ No newline at end of file diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 9e21583895..abed265e0b 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) { } code = (*pool->startFp)(ahandle, msg, qtype); - (*pool->endFp)(ahandle, msg, qtype, code); + if (pool->endFp) (*pool->endFp)(ahandle, msg, qtype, code); } return NULL; -- GitLab