提交 0ce12e8f 编写于 作者: S Shengliang Guan

TD-2524

上级 f82f11e2
...@@ -59,6 +59,8 @@ static void syncRestartPeer(SSyncPeer *pPeer); ...@@ -59,6 +59,8 @@ static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp); static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
static void syncStartCheckPeerConn(SSyncPeer *pPeer);
static void syncStopCheckPeerConn(SSyncPeer *pPeer);
static SSyncNode *syncAcquireNode(int64_t rid); static SSyncNode *syncAcquireNode(int64_t rid);
static void syncReleaseNode(SSyncNode *pNode); static void syncReleaseNode(SSyncNode *pNode);
...@@ -250,6 +252,11 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -250,6 +252,11 @@ int64_t syncStart(const SSyncInfo *pInfo) {
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} }
syncStartCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb
for (int32_t index = 0; index < pNode->replica; ++index) {
syncStartCheckPeerConn(pNode->peerInfo[index]);
}
return pNode->rid; return pNode->rid;
} }
...@@ -292,6 +299,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -292,6 +299,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb
for (int32_t index = 0; index < pNode->replica; ++index) {
syncStopCheckPeerConn(pNode->peerInfo[index]);
}
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
for (j = 0; j < pNewCfg->replica; ++j) { for (j = 0; j < pNewCfg->replica; ++j) {
if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) && if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) &&
...@@ -348,6 +360,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -348,6 +360,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} }
syncStartCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb
for (int32_t index = 0; index < pNode->replica; ++index) {
syncStartCheckPeerConn(pNode->peerInfo[index]);
}
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum);
...@@ -539,6 +556,26 @@ static void syncRemovePeer(SSyncPeer *pPeer) { ...@@ -539,6 +556,26 @@ static void syncRemovePeer(SSyncPeer *pPeer) {
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
} }
static void syncStartCheckPeerConn(SSyncPeer *pPeer) {
if (pPeer == NULL) return;
SSyncNode *pNode = pPeer->pSyncNode;
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs;
sDebug("%s, check peer connection after %d ms", pPeer->id, checkMs);
taosTmrReset(syncCheckPeerConnection, checkMs, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
}
}
static void syncStopCheckPeerConn(SSyncPeer *pPeer) {
if (pPeer == NULL) return;
taosTmrStopA(&pPeer->timer);
sDebug("%s, stop check peer connection", pPeer->id);
}
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == 0xFFFFFFFF) { if (ip == 0xFFFFFFFF) {
...@@ -565,13 +602,6 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -565,13 +602,6 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->rid = taosAddRef(tsPeerRefId, pPeer); pPeer->rid = taosAddRef(tsPeerRefId, pPeer);
sInfo("%s, %p it is configured, ep:%s:%u rid:%" PRId64, pPeer->id, pPeer, pPeer->fqdn, pPeer->port, pPeer->rid); sInfo("%s, %p it is configured, ep:%s:%u rid:%" PRId64, pPeer->id, pPeer, pPeer->fqdn, pPeer->port, pPeer->rid);
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs;
sDebug("%s, check peer connection after %d ms", pPeer->id, checkMs);
taosTmrReset(syncCheckPeerConnection, checkMs, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
}
(void)syncAcquireNode(pNode->rid); (void)syncAcquireNode(pNode->rid);
return pPeer; return pPeer;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册