未验证 提交 cdfa8fc3 编写于 作者: L Liu Jicong 提交者: GitHub

solve 3.0 conflict (#8428)

add tq data structure
上级 009270aa
...@@ -21,11 +21,11 @@ ...@@ -21,11 +21,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeTrans.h" #include "dnodeTrans.h"
#include "dnodeEps.h" #include "dnodeMain.h"
#include "dnodeMsg.h" #include "dnodeMnodeEps.h"
#include "dnodeStatus.h"
#include "mnode.h" #include "mnode.h"
#include "vnode.h" #include "vnode.h"
#include "mnode.h"
typedef void (*RpcMsgFp)(SRpcMsg *pMsg); typedef void (*RpcMsgFp)(SRpcMsg *pMsg);
...@@ -97,8 +97,12 @@ static int32_t dnodeInitServer() { ...@@ -97,8 +97,12 @@ static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -139,11 +143,12 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -139,11 +143,12 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEps(pEpSet); dnodeUpdateMnodeFromPeer(pEpSet);
} }
RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
if (fp != NULL) { if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg); (*fp)(pMsg);
} else { } else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
......
...@@ -797,13 +797,11 @@ static void vnodeInitMsgFp() { ...@@ -797,13 +797,11 @@ static void vnodeInitMsgFp() {
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
//mq related end
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
//mq related
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
//mq related end //mq related end
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
} }
void vnodeProcessMsg(SRpcMsg *pMsg) { void vnodeProcessMsg(SRpcMsg *pMsg) {
......
...@@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
//fetch or register context //fetch or register context
tqFetchMsg(pHandle, pRead); tqFetchMsg(pHandle, pRead);
//judge mode, tail read or catch up read //judge mode, tail read or catch up read
/*int64_t lastVer = walLastVer(pVnode->wal);*/
//launch new query //launch new query
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册