From cdfa8fc31c27ef25bba487253d0c9ebbbf45704d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 26 Oct 2021 10:09:07 +0800 Subject: [PATCH] solve 3.0 conflict (#8428) add tq data structure --- source/server/dnode/src/dnodeTrans.c | 17 +++++++++++------ source/server/vnode/src/vnodeMain.c | 6 ++---- source/server/vnode/src/vnodeReadMsg.c | 1 + 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 6d734932ff..6f4cb2d6eb 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -21,11 +21,11 @@ #define _DEFAULT_SOURCE #include "dnodeTrans.h" -#include "dnodeEps.h" -#include "dnodeMsg.h" +#include "dnodeMain.h" +#include "dnodeMnodeEps.h" +#include "dnodeStatus.h" #include "mnode.h" #include "vnode.h" -#include "mnode.h" typedef void (*RpcMsgFp)(SRpcMsg *pMsg); @@ -97,8 +97,12 @@ static int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -139,11 +143,12 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeEps(pEpSet); + dnodeUpdateMnodeFromPeer(pEpSet); } RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; if (fp != NULL) { + dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index da1c1d7235..f1834ffdbe 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -797,13 +797,11 @@ static void vnodeInitMsgFp() { tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; - //mq related end - tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; - //mq related tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; //mq related end + tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index b4070546c7..d4701ec4ce 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { //fetch or register context tqFetchMsg(pHandle, pRead); //judge mode, tail read or catch up read + /*int64_t lastVer = walLastVer(pVnode->wal);*/ //launch new query return 0; } -- GitLab