提交 2171bf7a 编写于 作者: S Shengliang Guan

TD-2157

上级 b95df0b8
...@@ -341,12 +341,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -341,12 +341,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum);
syncRole[nodeRole]);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
return 0; return 0;
} }
...@@ -378,7 +376,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { ...@@ -378,7 +376,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
pFwdRsp->code = code; pFwdRsp->code = code;
int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
int32_t retLen = write(pPeer->peerFd, msg, msgLen); int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen);
if (retLen == msgLen) { if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
...@@ -391,6 +389,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { ...@@ -391,6 +389,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
} }
#if 0
void syncRecover(int64_t rid) { void syncRecover(int64_t rid) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
...@@ -417,6 +416,7 @@ void syncRecover(int64_t rid) { ...@@ -417,6 +416,7 @@ void syncRecover(int64_t rid) {
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
} }
#endif
int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
...@@ -532,7 +532,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -532,7 +532,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
int32_t checkMs = 100 + (pNode->vgId * 10) % 100; int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs; if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs;
sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs); sDebug("%s, check peer connection after %d ms", pPeer->id, checkMs);
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
...@@ -566,8 +566,6 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -566,8 +566,6 @@ static void syncChooseMaster(SSyncNode *pNode) {
int32_t index = -1; int32_t index = -1;
int32_t replica = pNode->replica; int32_t replica = pNode->replica;
sDebug("vgId:%d, choose master", pNode->vgId);
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++; onlineNum++;
...@@ -633,11 +631,11 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -633,11 +631,11 @@ static void syncChooseMaster(SSyncNode *pNode) {
static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int32_t onlineNum = 0; int32_t onlineNum = 0;
int32_t index = -1; int32_t masterIndex = -1;
int32_t replica = pNode->replica; int32_t replica = pNode->replica;
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t index = 0; index < pNode->replica; ++index) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { if (pNode->peerInfo[index]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++; onlineNum++;
} }
} }
...@@ -652,18 +650,17 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -652,18 +650,17 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
if (onlineNum <= replica * 0.5) { if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
// pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
} }
} else { } else {
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t index = 0; index < pNode->replica; ++index) {
SSyncPeer *pTemp = pNode->peerInfo[i]; SSyncPeer *pTemp = pNode->peerInfo[index];
if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue; if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue;
if (index < 0) { if (masterIndex < 0) {
index = i; masterIndex = index;
} else { // multiple masters, it shall not happen } else { // multiple masters, it shall not happen
if (i == pNode->selfIndex) { if (masterIndex == pNode->selfIndex) {
sError("%s, peer is master, work as slave instead", pTemp->id); sError("%s, peer is master, work as slave instead", pTemp->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
...@@ -672,77 +669,80 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -672,77 +669,80 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
} }
} }
SSyncPeer *pMaster = (index >= 0) ? pNode->peerInfo[index] : NULL; SSyncPeer *pMaster = (masterIndex >= 0) ? pNode->peerInfo[masterIndex] : NULL;
return pMaster; return pMaster;
} }
static int32_t syncValidateMaster(SSyncPeer *pPeer) { static int32_t syncValidateMaster(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int32_t code = 0; int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); sDebug("%s, peer has higher sver:%" PRIu64 ", restart all peer connections", pPeer->id, pPeer->version);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
code = -1; code = -1;
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t index = 0; index < pNode->replica; ++index) {
if (i == pNode->selfIndex) continue; if (index == pNode->selfIndex) continue;
syncRestartPeer(pNode->peerInfo[i]); syncRestartPeer(pNode->peerInfo[index]);
} }
} }
return code; return code;
} }
static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) { static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t newPeerRole) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int8_t peerOldRole = pPeer->role; int8_t oldPeerRole = pPeer->role;
int8_t selfOldRole = nodeRole; int8_t oldSelfRole = nodeRole;
int8_t i, syncRequired = 0; int8_t syncRequired = 0;
// pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; pPeer->role = newPeerRole;
pPeer->role = newRole; sTrace("%s, peer role:%s change to %s", pPeer->id, syncRole[oldPeerRole], syncRole[newPeerRole]);
sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
SSyncPeer *pMaster = syncCheckMaster(pNode); SSyncPeer *pMaster = syncCheckMaster(pNode);
if (pMaster) { if (pMaster) {
// master is there // master is there
pNode->pMaster = pMaster; pNode->pMaster = pMaster;
sDebug("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version); sTrace("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version);
if (syncValidateMaster(pPeer) < 0) return; if (syncValidateMaster(pPeer) < 0) return;
if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) {
if (nodeVersion < pMaster->version) { if (nodeVersion < pMaster->version) {
sTrace("%s, is master, sync required, self sver:%" PRIu64, pMaster->id, nodeVersion);
syncRequired = 1; syncRequired = 1;
} else { } else {
sInfo("%s is master, work as slave, sver:%" PRIu64, pMaster->id, pMaster->version); sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
} else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
// nodeVersion = pMaster->version; sTrace("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion);
} }
} else { } else {
// master not there, if all peer's state and version are consistent, choose the master // master not there, if all peer's state and version are consistent, choose the master
int32_t consistent = 0; int32_t consistent = 0;
if (peersStatus) { int32_t index = 0;
for (i = 0; i < pNode->replica; ++i) { if (peersStatus != NULL) {
SSyncPeer *pTemp = pNode->peerInfo[i]; for (index = 0; index < pNode->replica; ++index) {
if (pTemp->role != peersStatus[i].role) break; SSyncPeer *pTemp = pNode->peerInfo[index];
if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break; if (pTemp->role != peersStatus[index].role) break;
if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[index].version)) break;
} }
if (i >= pNode->replica) consistent = 1; if (index >= pNode->replica) consistent = 1;
} else { } else {
if (pNode->replica == 2) consistent = 1; if (pNode->replica == 2) consistent = 1;
} }
if (consistent) { if (consistent) {
sTrace("vgId:%d, choose master", pNode->vgId);
syncChooseMaster(pNode); syncChooseMaster(pNode);
} else {
sTrace("vgId:%d, version inconsistent, cannot choose master", pNode->vgId);
} }
} }
...@@ -750,7 +750,8 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne ...@@ -750,7 +750,8 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
syncRecoverFromMaster(pMaster); syncRecoverFromMaster(pMaster);
} }
if (peerOldRole != newRole || nodeRole != selfOldRole) { if (oldPeerRole != newPeerRole || nodeRole != oldSelfRole) {
sDebug("vgId:%d, roles changed, broadcast status", pNode->vgId);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
} }
...@@ -760,7 +761,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne ...@@ -760,7 +761,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
} }
static void syncRestartPeer(SSyncPeer *pPeer) { static void syncRestartPeer(SSyncPeer *pPeer) {
sDebug("%s, restart connection", pPeer->id); sDebug("%s, restart peer connection", pPeer->id);
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
...@@ -768,6 +769,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) { ...@@ -768,6 +769,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, check peer connection in 1000 ms", pPeer->id);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
...@@ -862,7 +864,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -862,7 +864,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
} else { } else {
nodeSStatus = TAOS_SYNC_STATUS_START; nodeSStatus = TAOS_SYNC_STATUS_START;
...@@ -1001,7 +1003,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type ...@@ -1001,7 +1003,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
} }
int32_t retLen = write(pPeer->peerFd, msg, statusMsgLen); int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen);
if (retLen == statusMsgLen) { if (retLen == statusMsgLen) {
sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, syncRole[pPeersStatus->role], sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, syncRole[pPeersStatus->role],
pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]); pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]);
...@@ -1009,8 +1011,6 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type ...@@ -1009,8 +1011,6 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
sDebug("%s, failed to send status msg, restart", pPeer->id); sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
return;
} }
static void syncSetupPeerConnection(SSyncPeer *pPeer) { static void syncSetupPeerConnection(SSyncPeer *pPeer) {
...@@ -1025,7 +1025,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1025,7 +1025,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) { if (connFd < 0) {
sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
} }
...@@ -1038,15 +1038,15 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1038,15 +1038,15 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup", pPeer->id); sDebug("%s, connection to peer server is setup", pPeer->id);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
} else { } else {
sDebug("try later"); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
close(connFd); taosClose(connFd);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
......
...@@ -291,7 +291,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -291,7 +291,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
} }
void *syncRestoreData(void *param) { void *syncRestoreData(void *param) {
SSyncPeer *pPeer = (SSyncPeer *)param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
...@@ -300,7 +300,8 @@ void *syncRestoreData(void *param) { ...@@ -300,7 +300,8 @@ void *syncRestoreData(void *param) {
(*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING); (*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING);
if (syncOpenRecvBuffer(pNode) < 0) { if (syncOpenRecvBuffer(pNode) < 0) {
sError("%s, failed to allocate recv buffer", pPeer->id); sError("%s, failed to allocate recv buffer, restart connection", pPeer->id);
syncRestartConnection(pPeer);
} else { } else {
if (syncRestoreDataStepByStep(pPeer) == 0) { if (syncRestoreDataStepByStep(pPeer) == 0) {
sInfo("%s, it is synced successfully", pPeer->id); sInfo("%s, it is synced successfully", pPeer->id);
......
...@@ -150,7 +150,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { ...@@ -150,7 +150,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
} }
void taosFreeTcpConn(void *param) { void taosFreeTcpConn(void *param) {
SConnObj * pConn = (SConnObj *)param; SConnObj * pConn = param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册