提交 38f6f3b0 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/feature/raft-interface' into feature/dnode3

...@@ -24,10 +24,10 @@ extern "C" { ...@@ -24,10 +24,10 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "wal.h" #include "wal.h"
typedef uint32_t SyncNodeId; typedef int64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
typedef int64_t SyncIndex; typedef int64_t SyncIndex;
typedef uint64_t SyncTerm; typedef uint64_t SSyncTerm;
typedef enum { typedef enum {
TAOS_SYNC_ROLE_FOLLOWER = 0, TAOS_SYNC_ROLE_FOLLOWER = 0,
...@@ -41,9 +41,8 @@ typedef struct { ...@@ -41,9 +41,8 @@ typedef struct {
} SSyncBuffer; } SSyncBuffer;
typedef struct { typedef struct {
SyncNodeId nodeId; // node ID assigned by TDengine uint16_t nodePort; // node sync Port
uint16_t nodePort; // node sync Port char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo; } SNodeInfo;
typedef struct { typedef struct {
...@@ -53,9 +52,9 @@ typedef struct { ...@@ -53,9 +52,9 @@ typedef struct {
} SSyncCluster; } SSyncCluster;
typedef struct { typedef struct {
int32_t selfIndex; int32_t selfIndex;
int nNode; int nNode;
SyncNodeId* nodeId; SNodeInfo* node;
ESyncRole* role; ESyncRole* role;
} SNodesRole; } SNodesRole;
...@@ -85,8 +84,8 @@ typedef struct SSyncFSM { ...@@ -85,8 +84,8 @@ typedef struct SSyncFSM {
} SSyncFSM; } SSyncFSM;
typedef struct SSyncServerState { typedef struct SSyncServerState {
SyncNodeId voteFor; SNodeInfo voteFor;
SyncTerm term; SSyncTerm term;
} SSyncServerState; } SSyncServerState;
typedef struct SStateManager { typedef struct SStateManager {
...@@ -106,8 +105,8 @@ typedef struct { ...@@ -106,8 +105,8 @@ typedef struct {
twalh walHandle; twalh walHandle;
SyncIndex snapshotIndex; // initial version SyncIndex snapshotIndex;
SSyncCluster syncCfg; // configuration from mgmt SSyncCluster syncCfg;
SSyncFSM fsm; SSyncFSM fsm;
...@@ -122,7 +121,11 @@ void syncStop(SyncNodeId); ...@@ -122,7 +121,11 @@ void syncStop(SyncNodeId);
int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak); int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak);
extern int32_t syncDebugFlag; int32_t syncAddNode(SyncNodeId nodeId, const SNodeInfo *pNode);
int32_t syncRemoveNode(SyncNodeId nodeId, const SNodeInfo *pNode);
extern int32_t syncDebugFlag;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -96,8 +96,12 @@ static int32_t dnodeInitServer() { ...@@ -96,8 +96,12 @@ static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -143,6 +147,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -143,6 +147,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
if (fp != NULL) { if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg); (*fp)(pMsg);
} else { } else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
......
...@@ -812,13 +812,11 @@ static void vnodeInitMsgFp() { ...@@ -812,13 +812,11 @@ static void vnodeInitMsgFp() {
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
//mq related end
tsVnode.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVnode.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
//mq related
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
//mq related end //mq related end
tsVnode.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVnode.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
} }
void vnodeProcessMsg(SRpcMsg *pMsg) { void vnodeProcessMsg(SRpcMsg *pMsg) {
......
...@@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
//fetch or register context //fetch or register context
tqFetchMsg(pHandle, pRead); tqFetchMsg(pHandle, pRead);
//judge mode, tail read or catch up read //judge mode, tail read or catch up read
/*int64_t lastVer = walLastVer(pVnode->wal);*/
//launch new query //launch new query
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册