提交 7bcfb460 编写于 作者: S Shengliang Guan

minor changes

上级 7392e731
......@@ -84,7 +84,7 @@ void mnodeUnDeploy();
*
* @return Server status.
*/
EMnStatus mnodeIsServing();
EMnStatus mnodeGetStatus();
typedef struct {
int64_t numOfDnode;
......
......@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_WORKER_H
#define _TD_UTIL_WORKER_H
#include "tqueue.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -235,7 +235,7 @@ static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
}
if (mnodeIsServing()) return 0;
if (mnodeGetStatus() == MN_STATUS_READY) return 0;
return mnodeDeploy();
}
......
......@@ -257,7 +257,7 @@ static void* dnodeTelemThreadFp(void* param) {
if (r == 0) break;
if (r != ETIMEDOUT) continue;
if (mnodeIsServing()) {
if (mnodeGetStatus() == MN_STATUS_READY) {
dnodeSendTelemetryReport();
}
end.tv_sec += REPORT_INTERVAL;
......
......@@ -25,7 +25,7 @@ extern "C" {
tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId();
char *mnodeGetClusterId();
EMnStatus mnodeIsServing();
EMnStatus mnodeGetStatus();
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
......
......@@ -24,6 +24,8 @@ extern "C" {
int32_t mnodeInitMnode();
void mnodeCleanupMnode();
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect);
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect);
#ifdef __cplusplus
}
......
......@@ -25,6 +25,8 @@ extern "C" {
int32_t mnodeInitSync();
void mnodeCleanUpSync();
bool mnodeIsMaster();
#ifdef __cplusplus
}
#endif
......
......@@ -24,6 +24,9 @@ extern "C" {
int32_t mnodeInitWorker();
void mnodeCleanupWorker();
void mnodeProcessMsg(SRpcMsg *rpcMsg);
void mnodeSendRsp(SMnMsg *pMsg, int32_t code);
void mnodeReDispatchToWriteQueue(SMnMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -19,3 +19,6 @@
int32_t mnodeInitMnode() { return 0; }
void mnodeCleanupMnode() {}
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect) {}
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {}
\ No newline at end of file
......@@ -19,3 +19,5 @@
int32_t mnodeInitSync() { return 0; }
void mnodeCleanUpSync() {}
bool mnodeIsMaster() { return true; }
\ No newline at end of file
......@@ -15,7 +15,473 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnodeInt.h"
#include "tworker.h"
#include "tglobal.h"
#include "mnodeMnode.h"
#include "mnodeSdb.h"
#include "mnodeShow.h"
#include "mnodeSync.h"
#include "mnodeWorker.h"
int32_t mnodeInitWorker() { return 0; }
void mnodeCleanupWorker() {}
\ No newline at end of file
static struct {
SWorkerPool read;
SWorkerPool write;
SWorkerPool peerReq;
SWorkerPool peerRsp;
taos_queue readQ;
taos_queue writeQ;
taos_queue peerReqQ;
taos_queue peerRspQ;
int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
void (*peerRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
} tsMworker = {0};
static SMnMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) {
int32_t size = sizeof(SMnMsg) + pRpcMsg->contLen;
SMnMsg *pMsg = taosAllocateQitem(size);
pMsg->rpcMsg = *pRpcMsg;
pMsg->rpcMsg.pCont = pMsg->pCont;
pMsg->createdTime = taosGetTimestampSec();
memcpy(pMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen);
SRpcConnInfo connInfo = {0};
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) == 0) {
pMsg->pUser = sdbGetRow(MN_SDB_USER, connInfo.user);
}
if (pMsg->pUser == NULL) {
mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
taosFreeQitem(pMsg);
return NULL;
}
return pMsg;
}
static void mnodeCleanupMsg(SMnMsg *pMsg) {
if (pMsg == NULL) return;
if (pMsg->rpcMsg.pCont != pMsg->pCont) {
tfree(pMsg->rpcMsg.pCont);
}
taosFreeQitem(pMsg);
}
static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) {
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) {
mnodeSendRedirectMsg(pRpcMsg, true);
} else {
SMnMsg *pMsg = mnodeInitMsg(pRpcMsg);
if (pMsg == NULL) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER};
rpcSendResponse(&rpcRsp);
} else {
mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg);
}
}
rpcFreeCont(pRpcMsg->pCont);
}
void mnodeReDispatchToWriteQueue(SMnMsg *pMsg) {
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.writeQ == NULL) {
mnodeSendRedirectMsg(&pMsg->rpcMsg, true);
mnodeCleanupMsg(pMsg);
} else {
taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg);
}
}
static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) {
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.readQ == NULL) {
mnodeSendRedirectMsg(pRpcMsg, true);
} else {
SMnMsg *pMsg = mnodeInitMsg(pRpcMsg);
if (pMsg == NULL) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER};
rpcSendResponse(&rpcRsp);
} else {
mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.readQ, TAOS_QTYPE_RPC, pMsg);
}
}
rpcFreeCont(pRpcMsg->pCont);
}
static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
if (mnodeGetStatus() != MN_STATUS_READY || tsMworker.peerReqQ == NULL) {
mnodeSendRedirectMsg(pRpcMsg, false);
} else {
SMnMsg *pMsg = mnodeInitMsg(pRpcMsg);
if (pMsg == NULL) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER};
rpcSendResponse(&rpcRsp);
} else {
mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.peerReqQ, TAOS_QTYPE_RPC, pMsg);
}
}
rpcFreeCont(pRpcMsg->pCont);
}
void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) {
SMnMsg *pMsg = mnodeInitMsg(pRpcMsg);
if (pMsg == NULL) {
SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER};
rpcSendResponse(&rpcRsp);
} else {
mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType]);
taosWriteQitem(tsMworker.peerRspQ, TAOS_QTYPE_RPC, pMsg);
}
// rpcFreeCont(pRpcMsg->pCont);
}
static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) {
if (pMsg == NULL) return;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
mnodeReDispatchToWriteQueue(pMsg);
return;
}
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pMsg->rpcRsp.rsp,
.contLen = pMsg->rpcRsp.len,
.code = code,
};
rpcSendResponse(&rpcRsp);
mnodeCleanupMsg(pMsg);
}
void mnodeSendRsp(SMnMsg *pMsg, int32_t code) { mnodeSendRpcRsp(NULL, pMsg, 0, code); }
static void mnodeProcessPeerRspEnd(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) {
mnodeCleanupMsg(pMsg);
}
static void mnodeInitMsgFp() {
// // peer req
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessTableCfgMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessVnodeCfgMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessAuthMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeDispatchToPeerQueue;
// // tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_GRANT] = grantProcessMsgInMgmt;
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessDnodeStatusMsg;
// // peer rsp
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessCfgDnodeMsgRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessDropSuperTableRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessCreateChildTableRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessDropChildTableRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessAlterTableRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessCreateVnodeRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessAlterVnodeRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessCompactVnodeRsp;
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeDispatchToPeerRspQueue;
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessDropVnodeRsp;
// // read msg
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessHeartBeatMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessConnectMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessUseMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessTableMetaMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMultiTableMetaMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessSuperTableVgroupMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessShowMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessRetrieveMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeDispatchToReadQueue;
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessRetrieveFuncReq;
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeDispatchToWriteQueue;
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = acctProcessCreateAcctMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeDispatchToWriteQueue;
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = acctProcessDropAcctMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeDispatchToWriteQueue;
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = acctProcessAlterAcctMsg;
// // write msg
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessCreateUserMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessAlterUserMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessDropUserMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessCreateDnodeMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessDropDnodeMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessCfgDnodeMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessCreateDbMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessAlterDbMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessDropDbMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessSyncDbMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessCompactMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessCreateFuncMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessDropFuncMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeDispatchToWriteQueue;
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = tpProcessCreateTpMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeDispatchToWriteQueue;
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = tpProcessAlterTpMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeDispatchToWriteQueue;
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = tpProcessDropTpMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessCreateTableMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessDropTableMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessAlterTableMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = NULL;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessKillQueryMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessKillStreamMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeDispatchToWriteQueue;
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessKillConnectionMsg;
}
static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN;
}
if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT;
}
if (tsMworker.writeMsgFp[msgType] == NULL) {
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
}
return (*tsMworker.writeMsgFp[msgType])(pMsg);
}
static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) {
int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN;
}
if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
if (!epSet) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT;
}
if (tsMworker.readMsgFp[msgType] == NULL) {
mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
}
mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]);
return (*tsMworker.readMsgFp[msgType])(pMsg);
}
static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
int32_t msgType = pMsg->rpcMsg.msgType;
void * ahandle = pMsg->rpcMsg.ahandle;
if (pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN;
}
if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForPeer(epSet, true);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT;
}
if (tsMworker.peerReqFp[msgType] == NULL) {
mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
}
return (*tsMworker.peerReqFp[msgType])(pMsg);
}
static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) {
int32_t msgType = pMsg->rpcMsg.msgType;
SRpcMsg *pRpcMsg = &pMsg->rpcMsg;
if (!mnodeIsMaster()) {
mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
return 0;
}
if (tsMworker.peerRspFp[msgType]) {
(*tsMworker.peerRspFp[msgType])(pRpcMsg);
} else {
mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
}
return 0;
}
int32_t mnodeInitWorker() {
mnodeInitMsgFp();
SWorkerPool *pPool = &tsMworker.write;
pPool->name = "mnode-write";
pPool->startFp = (ProcessStartFp)mnodeProcessWriteReq;
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL);
}
pPool = &tsMworker.read;
pPool->name = "mnode-read";
pPool->startFp = (ProcessStartFp)mnodeProcessReadReq;
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
pPool->min = 2;
pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2);
pPool->max = MAX(2, pPool->max);
pPool->max = MIN(4, pPool->max);
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
tsMworker.readQ = tWorkerAllocQueue(pPool, NULL);
}
pPool = &tsMworker.peerReq;
pPool->name = "mnode-peer-req";
pPool->startFp = (ProcessStartFp)mnodeProcessPeerReq;
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL);
}
pPool = &tsMworker.peerRsp;
pPool->name = "mnode-peer-rsp";
pPool->startFp = (ProcessStartFp)mnodeProcessPeerRsp;
pPool->endFp = (ProcessEndFp)mnodeProcessPeerRspEnd;
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL);
}
mInfo("mnode worker is initialized");
return 0;
}
void mnodeCleanupWorker() {
tWorkerFreeQueue(&tsMworker.write, tsMworker.writeQ);
tWorkerCleanup(&tsMworker.write);
tsMworker.writeQ = NULL;
tWorkerFreeQueue(&tsMworker.read, tsMworker.readQ);
tWorkerCleanup(&tsMworker.read);
tsMworker.readQ = NULL;
tWorkerFreeQueue(&tsMworker.peerReq, tsMworker.peerReqQ);
tWorkerCleanup(&tsMworker.peerReq);
tsMworker.peerReqQ = NULL;
tWorkerFreeQueue(&tsMworker.peerRsp, tsMworker.peerRspQ);
tWorkerCleanup(&tsMworker.peerRsp);
tsMworker.peerRspQ = NULL;
mInfo("mnode worker is closed");
}
void mnodeProcessMsg(SRpcMsg *pMsg) {
if (tsMworker.msgFp[pMsg->msgType]) {
(*tsMworker.msgFp[pMsg->msgType])(pMsg);
} else {
assert(0);
}
}
......@@ -51,7 +51,7 @@ int32_t mnodeGetDnodeId() { return tsMint.dnodeId; }
char *mnodeGetClusterId() { return tsMint.clusterId; }
EMnStatus mnodeIsServing() { return tsMint.state; }
EMnStatus mnodeGetStatus() { return tsMint.state; }
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) {
(*tsMint.fp.SendMsgToDnode)(epSet, rpcMsg);
......@@ -244,5 +244,3 @@ void mnodeCleanup() {
}
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; }
void mnodeProcessMsg(SRpcMsg *rpcMsg) {}
\ No newline at end of file
......@@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) {
}
code = (*pool->startFp)(ahandle, msg, qtype);
(*pool->endFp)(ahandle, msg, qtype, code);
if (pool->endFp) (*pool->endFp)(ahandle, msg, qtype, code);
}
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册