diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index dd2157655c2245acbe78a5723ebc7fa966992c09..454a2ecfa83e95ecd1c0ef3179e31013f1a5aebc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,10 +24,10 @@ extern "C" { #include "taosdef.h" #include "wal.h" -typedef uint32_t SyncNodeId; -typedef int32_t SyncGroupId; -typedef int64_t SyncIndex; -typedef uint64_t SyncTerm; +typedef int64_t SyncNodeId; +typedef int32_t SyncGroupId; +typedef int64_t SyncIndex; +typedef uint64_t SSyncTerm; typedef enum { TAOS_SYNC_ROLE_FOLLOWER = 0, @@ -41,9 +41,8 @@ typedef struct { } SSyncBuffer; typedef struct { - SyncNodeId nodeId; // node ID assigned by TDengine - uint16_t nodePort; // node sync Port - char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; typedef struct { @@ -53,9 +52,9 @@ typedef struct { } SSyncCluster; typedef struct { - int32_t selfIndex; - int nNode; - SyncNodeId* nodeId; + int32_t selfIndex; + int nNode; + SNodeInfo* node; ESyncRole* role; } SNodesRole; @@ -85,8 +84,8 @@ typedef struct SSyncFSM { } SSyncFSM; typedef struct SSyncServerState { - SyncNodeId voteFor; - SyncTerm term; + SNodeInfo voteFor; + SSyncTerm term; } SSyncServerState; typedef struct SStateManager { @@ -106,8 +105,8 @@ typedef struct { twalh walHandle; - SyncIndex snapshotIndex; // initial version - SSyncCluster syncCfg; // configuration from mgmt + SyncIndex snapshotIndex; + SSyncCluster syncCfg; SSyncFSM fsm; @@ -122,7 +121,11 @@ void syncStop(SyncNodeId); int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak); -extern int32_t syncDebugFlag; +int32_t syncAddNode(SyncNodeId nodeId, const SNodeInfo *pNode); + +int32_t syncRemoveNode(SyncNodeId nodeId, const SNodeInfo *pNode); + +extern int32_t syncDebugFlag; #ifdef __cplusplus } diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index e109e2ce9d1a67b081578bec20cfe68e11403628..4eba31ab68feb97576cf4104a0da1e84fb926ea8 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -96,8 +96,12 @@ static int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -143,6 +147,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; if (fp != NULL) { + dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); diff --git a/source/server/vnode/src/vnodeInt.c b/source/server/vnode/src/vnodeInt.c index 9c09a991e40d671d87279f981f6d63d36ead07e3..7ec5200e5d961aa86c87f58a41f85109a59e7aa1 100644 --- a/source/server/vnode/src/vnodeInt.c +++ b/source/server/vnode/src/vnodeInt.c @@ -812,13 +812,11 @@ static void vnodeInitMsgFp() { tsVnode.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; - //mq related end - tsVnode.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; - //mq related tsVnode.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; //mq related end + tsVnode.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVnode.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 419a1f73b697099793097435e8b66f9e42c369de..21ecde332646d0af23970b1e456b0379480aff9e 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { //fetch or register context tqFetchMsg(pHandle, pRead); //judge mode, tail read or catch up read + /*int64_t lastVer = walLastVer(pVnode->wal);*/ //launch new query return 0; }