提交 b95df0b8 编写于 作者: S Shengliang Guan

TD-2153

......@@ -68,6 +68,9 @@ typedef struct {
typedef struct {
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[];
} SPeersStatus;
......
......@@ -50,7 +50,7 @@ static int32_t tsSyncRefId = -1;
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer);
static void syncCheckPeerConnection(void *param, void *tmrId);
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack);
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
static void syncProcessBrokenLink(void *param);
static int32_t syncProcessPeerMsg(void *param, void *buffer);
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
......@@ -73,6 +73,28 @@ char* syncRole[] = {
"master"
};
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP
} ESyncStatusType;
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
int32_t syncInit() {
SPoolInfo info;
......@@ -524,7 +546,7 @@ void syncBroadcastStatus(SSyncNode *pNode) {
for (int32_t i = 0; i < pNode->replica; ++i) {
if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId());
}
}
......@@ -891,14 +913,14 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg is received, self:%s sver:%" PRIu64 " peer:%s sver:%" PRIu64 ", ack:%d", pPeer->id,
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
sDebug("%s, status msg is received, self:%s sver:%" PRIu64 " peer:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id,
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]);
pPeer->version = pPeersStatus->version;
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
if (pPeersStatus->ack) {
syncSendPeersStatusMsgToPeer(pPeer, 0);
syncSendPeersStatusMsgToPeer(pPeer, 0, pPeersStatus->type + 1, pPeersStatus->tranId);
}
}
......@@ -956,7 +978,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0};
......@@ -971,6 +993,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
pPeersStatus->version = nodeVersion;
pPeersStatus->role = nodeRole;
pPeersStatus->ack = ack;
pPeersStatus->type = type;
pPeersStatus->tranId = tranId;
for (int32_t i = 0; i < pNode->replica; ++i) {
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
......@@ -979,8 +1003,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
int32_t retLen = write(pPeer->peerFd, msg, statusMsgLen);
if (retLen == statusMsgLen) {
sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role],
pPeersStatus->version, pPeersStatus->ack);
sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, syncRole[pPeersStatus->role],
pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]);
} else {
sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer);
......@@ -995,7 +1019,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
taosTmrStopA(&pPeer->timer);
if (pPeer->peerFd >= 0) {
sDebug("%s, send role version to peer", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_SETUP_CONN, syncGenTranId());
return;
}
......@@ -1108,7 +1132,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册