diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 309d5b1a751c29329f153639195a590607925e50..666a90c3cb2619546011edfc44753d9773847b8f 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -68,6 +68,9 @@ typedef struct { typedef struct { int8_t role; int8_t ack; + int8_t type; + int8_t reserved[3]; + uint16_t tranId; uint64_t version; SPeerStatus peersStatus[]; } SPeersStatus; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 5c56e90af63a40984e1df8e13043830155cfac7a..895b1cd098b6d7121e3845361fe436a678c10ee8 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -50,7 +50,7 @@ static int32_t tsSyncRefId = -1; static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer); static void syncCheckPeerConnection(void *param, void *tmrId); -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack); +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static void syncProcessBrokenLink(void *param); static int32_t syncProcessPeerMsg(void *param, void *buffer); static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); @@ -73,6 +73,28 @@ char* syncRole[] = { "master" }; +typedef enum { + SYNC_STATUS_BROADCAST, + SYNC_STATUS_BROADCAST_RSP, + SYNC_STATUS_SETUP_CONN, + SYNC_STATUS_SETUP_CONN_RSP, + SYNC_STATUS_EXCHANGE_DATA, + SYNC_STATUS_EXCHANGE_DATA_RSP +} ESyncStatusType; + +char *statusType[] = { + "broadcast", + "broadcast-rsp", + "setup-conn", + "setup-conn-rsp", + "exchange-data", + "exchange-data-rsp" +}; + +uint16_t syncGenTranId() { + return taosRand() & 0XFFFF; +} + int32_t syncInit() { SPoolInfo info; @@ -524,7 +546,7 @@ void syncBroadcastStatus(SSyncNode *pNode) { for (int32_t i = 0; i < pNode->replica; ++i) { if (i == pNode->selfIndex) continue; pPeer = pNode->peerInfo[i]; - syncSendPeersStatusMsgToPeer(pPeer, 1); + syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId()); } } @@ -891,14 +913,14 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus *pPeersStatus = (SPeersStatus *)cont; - sDebug("%s, status msg is received, self:%s sver:%" PRIu64 " peer:%s sver:%" PRIu64 ", ack:%d", pPeer->id, - syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); + sDebug("%s, status msg is received, self:%s sver:%" PRIu64 " peer:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, + syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]); pPeer->version = pPeersStatus->version; syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); if (pPeersStatus->ack) { - syncSendPeersStatusMsgToPeer(pPeer, 0); + syncSendPeersStatusMsgToPeer(pPeer, 0, pPeersStatus->type + 1, pPeersStatus->tranId); } } @@ -956,7 +978,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { #define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { SSyncNode *pNode = pPeer->pSyncNode; char msg[statusMsgLen] = {0}; @@ -971,6 +993,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { pPeersStatus->version = nodeVersion; pPeersStatus->role = nodeRole; pPeersStatus->ack = ack; + pPeersStatus->type = type; + pPeersStatus->tranId = tranId; for (int32_t i = 0; i < pNode->replica; ++i) { pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; @@ -979,8 +1003,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { int32_t retLen = write(pPeer->peerFd, msg, statusMsgLen); if (retLen == statusMsgLen) { - sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role], - pPeersStatus->version, pPeersStatus->ack); + sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, syncRole[pPeersStatus->role], + pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]); } else { sDebug("%s, failed to send status msg, restart", pPeer->id); syncRestartConnection(pPeer); @@ -995,7 +1019,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); if (pPeer->peerFd >= 0) { sDebug("%s, send role version to peer", pPeer->id); - syncSendPeersStatusMsgToPeer(pPeer, 1); + syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_SETUP_CONN, syncGenTranId()); return; } @@ -1108,7 +1132,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); syncAddPeerRef(pPeer); sDebug("%s, ready to exchange data", pPeer->id); - syncSendPeersStatusMsgToPeer(pPeer, 1); + syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); } }