提交 5d8be741 编写于 作者: S Shengliang Guan

TD-1652

上级 170505d2
...@@ -65,6 +65,9 @@ typedef struct { ...@@ -65,6 +65,9 @@ typedef struct {
typedef struct { typedef struct {
int8_t role; int8_t role;
int8_t ack; int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version; uint64_t version;
SPeerStatus peersStatus[]; SPeerStatus peersStatus[];
} SPeersStatus; } SPeersStatus;
......
...@@ -48,7 +48,7 @@ static void * vgIdHash; ...@@ -48,7 +48,7 @@ static void * vgIdHash;
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer);
static void syncCheckPeerConnection(void *param, void *tmrId); static void syncCheckPeerConnection(void *param, void *tmrId);
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack); static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
static void syncProcessBrokenLink(void *param); static void syncProcessBrokenLink(void *param);
static int syncProcessPeerMsg(void *param, void *buffer); static int syncProcessPeerMsg(void *param, void *buffer);
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp);
...@@ -71,6 +71,28 @@ char* syncRole[] = { ...@@ -71,6 +71,28 @@ char* syncRole[] = {
"master" "master"
}; };
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP
} ESyncStatusType;
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
int32_t syncInit() { int32_t syncInit() {
SPoolInfo info; SPoolInfo info;
...@@ -528,7 +550,7 @@ void syncBroadcastStatus(SSyncNode *pNode) { ...@@ -528,7 +550,7 @@ void syncBroadcastStatus(SSyncNode *pNode) {
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
if (i == pNode->selfIndex) continue; if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId());
} }
} }
...@@ -866,6 +888,12 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -866,6 +888,12 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SWalHead * pHead = (SWalHead *)cont; SWalHead * pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
if (pHead->version != nodeVersion + 1) {
sError("%s, forward is received, ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64, pPeer->id, pHead->version,
nodeVersion);
syncRestartConnection(pPeer);
return;
}
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
...@@ -883,14 +911,15 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { ...@@ -883,14 +911,15 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont; SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d tranId:%u type:%s",
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); pPeer->id, syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version,
pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]);
pPeer->version = pPeersStatus->version; pPeer->version = pPeersStatus->version;
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
if (pPeersStatus->ack) { if (pPeersStatus->ack) {
syncSendPeersStatusMsgToPeer(pPeer, 0); syncSendPeersStatusMsgToPeer(pPeer, 0, pPeersStatus->type + 1, pPeersStatus->tranId);
} }
} }
...@@ -947,7 +976,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) { ...@@ -947,7 +976,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA #define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0}; char msg[statusMsgLen] = {0};
...@@ -962,6 +991,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { ...@@ -962,6 +991,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
pPeersStatus->version = nodeVersion; pPeersStatus->version = nodeVersion;
pPeersStatus->role = nodeRole; pPeersStatus->role = nodeRole;
pPeersStatus->ack = ack; pPeersStatus->ack = ack;
pPeersStatus->type = type;
pPeersStatus->tranId = tranId;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
...@@ -970,7 +1001,9 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { ...@@ -970,7 +1001,9 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
int retLen = write(pPeer->peerFd, msg, statusMsgLen); int retLen = write(pPeer->peerFd, msg, statusMsgLen);
if (retLen == statusMsgLen) { if (retLen == statusMsgLen) {
sDebug("%s, status msg is sent", pPeer->id); sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id,
syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId,
statusType[pPeersStatus->type]);
} else { } else {
sDebug("%s, failed to send status msg, restart", pPeer->id); sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -985,7 +1018,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -985,7 +1018,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
sDebug("%s, send role version to peer", pPeer->id); sDebug("%s, send role version to peer", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_SETUP_CONN, syncGenTranId());
return; return;
} }
...@@ -1098,7 +1131,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { ...@@ -1098,7 +1131,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id); sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册