提交 fa5d57a4 编写于 作者: R root

TD-1382

上级 a0f01c7d
...@@ -35,16 +35,14 @@ int tsSyncTcpThreads = 2; ...@@ -35,16 +35,14 @@ int tsSyncTcpThreads = 2;
int tsMaxWatchFiles = 500; int tsMaxWatchFiles = 500;
int tsMaxFwdInfo = 200; int tsMaxFwdInfo = 200;
int tsSyncTimer = 1; int tsSyncTimer = 1;
//int sDebugFlag = 135;
//char tsArbitrator[TSDB_FQDN_LEN] = {0};
// module global, not configurable // module global, not configurable
int tsSyncNum; // number of sync in process in whole system int tsSyncNum; // number of sync in process in whole system
char tsNodeFqdn[TSDB_FQDN_LEN]; char tsNodeFqdn[TSDB_FQDN_LEN];
static ttpool_h tsTcpPool; static ttpool_h tsTcpPool;
static void *syncTmrCtrl = NULL; static void * syncTmrCtrl = NULL;
static void *vgIdHash; static void * vgIdHash;
// local functions // local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
...@@ -164,9 +162,10 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -164,9 +162,10 @@ void *syncStart(const SSyncInfo *pInfo) {
for (int i = 0; i < pCfg->replica; ++i) { for (int i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
pNode->selfIndex = i; pNode->selfIndex = i;
} }
}
if (pNode->selfIndex < 0) { if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId); sInfo("vgId:%d, this node is not configured", pNode->vgId);
...@@ -198,14 +197,15 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -198,14 +197,15 @@ void *syncStart(const SSyncInfo *pInfo) {
syncAddNodeRef(pNode); syncAddNodeRef(pNode);
taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
if (pNode->notifyRole) if (pNode->notifyRole) {
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
}
return pNode; return pNode;
} }
void syncStop(void *param) { void syncStop(void *param) {
SSyncNode * pNode = param; SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
if (pNode == NULL) return; if (pNode == NULL) return;
...@@ -230,12 +230,12 @@ void syncStop(void *param) { ...@@ -230,12 +230,12 @@ void syncStop(void *param) {
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
SSyncNode * pNode = param; SSyncNode *pNode = param;
int i, j; int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNewCfg->replica, pNode->replica); pNode->replica);
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
...@@ -268,17 +268,19 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { ...@@ -268,17 +268,19 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
newPeers[i] = pNode->peerInfo[j]; newPeers[i] = pNode->peerInfo[j];
} }
if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) {
pNode->selfIndex = i; pNode->selfIndex = i;
} }
}
pNode->replica = pNewCfg->replica; pNode->replica = pNewCfg->replica;
pNode->quorum = pNewCfg->quorum; pNode->quorum = pNewCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica); memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica);
for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) {
pNode->peerInfo[i] = NULL; pNode->peerInfo[i] = NULL;
}
syncAddArbitrator(pNode); syncAddArbitrator(pNode);
...@@ -290,17 +292,18 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { ...@@ -290,17 +292,18 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
syncRole[nodeRole]);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
SSyncNode * pNode = param; SSyncNode *pNode = param;
SSyncPeer * pPeer; SSyncPeer *pPeer;
SSyncHead *pSyncHead; SSyncHead *pSyncHead;
SWalHead *pWalHead = data; SWalHead * pWalHead = data;
int fwdLen; int fwdLen;
int code = 0; int code = 0;
...@@ -308,23 +311,23 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -308,23 +311,23 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
// always update version // always update version
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER ) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded // only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance // a hacker way to improve the performance
pSyncHead = (SSyncHead *) ( ((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD; pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0; pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len; pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); //include the WAL and SYNC head fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer == NULL || pPeer->peerFd <0) continue; if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) { if (pNode->quorum > 1 && code == 0) {
...@@ -356,7 +359,7 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { ...@@ -356,7 +359,7 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) {
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
SSyncHead *pHead = (SSyncHead *) msg; SSyncHead *pHead = (SSyncHead *)msg;
pHead->type = TAOS_SMSG_FORWARD_RSP; pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp); pHead->len = sizeof(SFwdRsp);
...@@ -389,7 +392,7 @@ void syncRecover(void *param) { ...@@ -389,7 +392,7 @@ void syncRecover(void *param) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pPeer = (SSyncPeer *) pNode->peerInfo[i]; pPeer = (SSyncPeer *)pNode->peerInfo[i];
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
...@@ -402,7 +405,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { ...@@ -402,7 +405,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i=0; i<pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
pNodesRole->role[i] = pNode->peerInfo[i]->role; pNodesRole->role[i] = pNode->peerInfo[i]->role;
} }
...@@ -439,13 +442,11 @@ static void syncAddArbitrator(SSyncNode *pNode) { ...@@ -439,13 +442,11 @@ static void syncAddArbitrator(SSyncNode *pNode) {
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
} }
static void syncAddNodeRef(SSyncNode *pNode) static void syncAddNodeRef(SSyncNode *pNode) {
{
atomic_add_fetch_8(&pNode->refCount, 1); atomic_add_fetch_8(&pNode->refCount, 1);
} }
static void syncDecNodeRef(SSyncNode *pNode) static void syncDecNodeRef(SSyncNode *pNode) {
{
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
pthread_mutex_destroy(&pNode->mutex); pthread_mutex_destroy(&pNode->mutex);
taosTFree(pNode->pRecv); taosTFree(pNode->pRecv);
...@@ -492,7 +493,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -492,7 +493,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL; if (ip == -1) return NULL;
SSyncPeer *pPeer = (SSyncPeer *) calloc(1, sizeof(SSyncPeer)); SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL; if (pPeer == NULL) return NULL;
pPeer->nodeId = pInfo->nodeId; pPeer->nodeId = pInfo->nodeId;
...@@ -511,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -511,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
int ret = strcmp(pPeer->fqdn, tsNodeFqdn); int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, start to check peer connection", pPeer->id); sDebug("%s, start to check peer connection", pPeer->id);
taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId*10)%100, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId * 10) % 100, pPeer, syncTmrCtrl, &pPeer->timer);
} }
syncAddNodeRef(pNode); syncAddNodeRef(pNode);
...@@ -547,18 +548,20 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -547,18 +548,20 @@ static void syncChooseMaster(SSyncNode *pNode) {
sDebug("vgId:%d, choose master", pNode->vgId); sDebug("vgId:%d, choose master", pNode->vgId);
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++; onlineNum++;
} }
}
if (onlineNum == pNode->replica) { if (onlineNum == pNode->replica) {
// if all peers are online, peer with highest version shall be master // if all peers are online, peer with highest version shall be master
index = 0; index = 0;
for (int i = 1; i < pNode->replica; ++i) { for (int i = 1; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) {
index = i; index = i;
} }
} }
}
// add arbitrator connection // add arbitrator connection
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
...@@ -573,11 +576,12 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -573,11 +576,12 @@ static void syncChooseMaster(SSyncNode *pNode) {
//slave with highest version shall be master //slave with highest version shall be master
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) {
index = i; index = i;
} }
} }
} }
}
if (index >= 0) { if (index >= 0) {
if (index == pNode->selfIndex) { if (index == pNode->selfIndex) {
...@@ -600,9 +604,10 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { ...@@ -600,9 +604,10 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int replica = pNode->replica; int replica = pNode->replica;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++; onlineNum++;
} }
}
// add arbitrator connection // add arbitrator connection
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
...@@ -649,7 +654,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { ...@@ -649,7 +654,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
code = -1; code = -1;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
if ( i == pNode->selfIndex ) continue; if (i == pNode->selfIndex) continue;
syncRestartPeer(pNode->peerInfo[i]); syncRestartPeer(pNode->peerInfo[i]);
} }
} }
...@@ -666,12 +671,11 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne ...@@ -666,12 +671,11 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
pPeer->role = newRole; pPeer->role = newRole;
sDebug("%s, own role:%s, new peer role:%s", pPeer->id, sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
syncRole[nodeRole], syncRole[pPeer->role]);
SSyncPeer *pMaster = syncCheckMaster(pNode); SSyncPeer *pMaster = syncCheckMaster(pNode);
if ( pMaster ) { if (pMaster) {
// master is there // master is there
pNode->pMaster = pMaster; pNode->pMaster = pMaster;
sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version); sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version);
...@@ -704,19 +708,22 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne ...@@ -704,19 +708,22 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if (pNode->replica == 2) consistent = 1; if (pNode->replica == 2) consistent = 1;
} }
if (consistent) if (consistent) {
syncChooseMaster(pNode); syncChooseMaster(pNode);
} }
}
if (syncRequired) { if (syncRequired) {
syncRecoverFromMaster(pMaster); syncRecoverFromMaster(pMaster);
} }
if (peerOldRole != newRole || nodeRole != selfOldRole) if (peerOldRole != newRole || nodeRole != selfOldRole) {
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
}
if (nodeRole != TAOS_SYNC_ROLE_MASTER) if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
syncResetFlowCtrl(pNode); syncResetFlowCtrl(pNode);
}
} }
static void syncRestartPeer(SSyncPeer *pPeer) { static void syncRestartPeer(SSyncPeer *pPeer) {
...@@ -727,8 +734,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) { ...@@ -727,8 +734,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
int ret = strcmp(pPeer->fqdn, tsNodeFqdn); int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
}
} }
void syncRestartConnection(SSyncPeer *pPeer) { void syncRestartConnection(SSyncPeer *pPeer) {
...@@ -810,7 +818,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -810,7 +818,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
return; return;
} }
sDebug("%s, try to sync", pPeer->id) sDebug("%s, try to sync", pPeer->id);
SFirstPkt firstPkt; SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt)); memset(&firstPkt, 0, sizeof(firstPkt));
...@@ -819,31 +827,29 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -819,31 +827,29 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
taosTmrReset(syncNotStarted, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt) ) { if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
} else { } else {
nodeSStatus = TAOS_SYNC_STATUS_START; nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-req is sent", pPeer->id); sInfo("%s, sync-req is sent", pPeer->id);
} }
return;
} }
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp *pFwdRsp = (SFwdRsp *) cont; SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo *pFwdInfo; SFwdInfo * pFwdInfo;
sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version); sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first // find the forwardInfo from first
for (int i=0; i<pSyncFwds->fwds; ++i) { for (int i = 0; i < pSyncFwds->fwds; ++i) {
pFwdInfo = pSyncFwds->fwdInfo + (i+pSyncFwds->first)%tsMaxFwdInfo; pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
if (pFwdRsp->version == pFwdInfo->version) break; if (pFwdRsp->version == pFwdInfo->version) break;
} }
...@@ -853,13 +859,13 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { ...@@ -853,13 +859,13 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
} }
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead *pHead = (SWalHead *)cont; SWalHead * pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
//nodeVersion = pHead->version; // nodeVersion = pHead->version;
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
} else { } else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
...@@ -882,12 +888,13 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { ...@@ -882,12 +888,13 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
pPeer->version = pPeersStatus->version; pPeer->version = pPeersStatus->version;
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
if (pPeersStatus->ack) if (pPeersStatus->ack) {
syncSendPeersStatusMsgToPeer(pPeer, 0); syncSendPeersStatusMsgToPeer(pPeer, 0);
}
} }
static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
if (pPeer->peerFd <0) return -1; if (pPeer->peerFd < 0) return -1;
int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) { if (hlen != sizeof(SSyncHead)) {
...@@ -911,9 +918,9 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -911,9 +918,9 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
} }
static int syncProcessPeerMsg(void *param, void *buffer) { static int syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer * pPeer = param; SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead head;
char *cont = (char *)buffer; char * cont = (char *)buffer;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
...@@ -937,16 +944,16 @@ static int syncProcessPeerMsg(void *param, void *buffer) { ...@@ -937,16 +944,16 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
return code; return code;
} }
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA #define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0}; char msg[statusMsgLen] = {0};
if (pPeer->peerFd <0 || pPeer->ip ==0) return; if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
SSyncHead *pHead = (SSyncHead *) msg; SSyncHead * pHead = (SSyncHead *)msg;
SPeersStatus *pPeersStatus = (SPeersStatus *) (msg + sizeof(SSyncHead)); SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead));
pHead->type = TAOS_SMSG_STATUS; pHead->type = TAOS_SMSG_STATUS;
pHead->len = statusMsgLen - sizeof(SSyncHead); pHead->len = statusMsgLen - sizeof(SSyncHead);
...@@ -984,13 +991,13 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -984,13 +991,13 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) { if (connFd < 0) {
sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
return; return;
} }
SFirstPkt firstPkt; SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt)); memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId:0; firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
firstPkt.syncHead.type = TAOS_SMSG_STATUS; firstPkt.syncHead.type = TAOS_SMSG_STATUS;
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
...@@ -1005,7 +1012,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1005,7 +1012,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
} else { } else {
sDebug("try later"); sDebug("try later");
close(connFd); close(connFd);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
} }
} }
...@@ -1070,8 +1077,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { ...@@ -1070,8 +1077,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break;
break;
} }
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
...@@ -1096,8 +1102,6 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { ...@@ -1096,8 +1102,6 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
return;
} }
static void syncProcessBrokenLink(void *param) { static void syncProcessBrokenLink(void *param) {
...@@ -1128,8 +1132,10 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { ...@@ -1128,8 +1132,10 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
pSyncFwds->fwds--; pSyncFwds->fwds--;
} }
if (pSyncFwds->fwds > 0) if (pSyncFwds->fwds > 0) {
pSyncFwds->last = (pSyncFwds->last+1) % tsMaxFwdInfo; pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo;
}
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
pFwdInfo->version = version; pFwdInfo->version = version;
pFwdInfo->mhandle = mhandle; pFwdInfo->mhandle = mhandle;
...@@ -1145,14 +1151,14 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { ...@@ -1145,14 +1151,14 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int fwds = pSyncFwds->fwds; int fwds = pSyncFwds->fwds;
for (int i=0; i<fwds; ++i) { for (int i = 0; i < fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break; if (pFwdInfo->confirmed == 0) break;
pSyncFwds->first = (pSyncFwds->first+1) % tsMaxFwdInfo; pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
pSyncFwds->fwds--; pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
//sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d", // sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d",
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); // pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
memset(pFwdInfo, 0, sizeof(SFwdInfo)); memset(pFwdInfo, 0, sizeof(SFwdInfo));
} }
...@@ -1164,13 +1170,15 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1164,13 +1170,15 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
if (code == 0) { if (code == 0) {
pFwdInfo->acks++; pFwdInfo->acks++;
if (pFwdInfo->acks >= pNode->quorum-1) if (pFwdInfo->acks >= pNode->quorum - 1) {
confirm = 1; confirm = 1;
}
} else { } else {
pFwdInfo->nacks++; pFwdInfo->nacks++;
if (pFwdInfo->nacks > pNode->replica-pNode->quorum) if (pFwdInfo->nacks > pNode->replica - pNode->quorum) {
confirm = 1; confirm = 1;
} }
}
if (confirm && pFwdInfo->confirmed == 0) { if (confirm && pFwdInfo->confirmed == 0) {
sDebug("vgId:%d, forward is confirmed, ver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); sDebug("vgId:%d, forward is confirmed, ver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
...@@ -1186,8 +1194,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1186,8 +1194,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int i=0; i<pSyncFwds->fwds; ++i) { for (int i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first+i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break; if (time - pFwdInfo->time < 2000) break;
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册