提交 0a90950b 编写于 作者: S Shengliang Guan

TD-2157

上级 b6540b44
...@@ -172,6 +172,7 @@ typedef struct SSyncNode { ...@@ -172,6 +172,7 @@ typedef struct SSyncNode {
// sync module global // sync module global
extern int32_t tsSyncNum; extern int32_t tsSyncNum;
extern char tsNodeFqdn[TSDB_FQDN_LEN]; extern char tsNodeFqdn[TSDB_FQDN_LEN];
extern char * syncStatus[];
void *syncRetrieveData(void *param); void *syncRetrieveData(void *param);
void *syncRestoreData(void *param); void *syncRestoreData(void *param);
......
...@@ -73,6 +73,14 @@ char* syncRole[] = { ...@@ -73,6 +73,14 @@ char* syncRole[] = {
"master" "master"
}; };
char *syncStatus[] = {
"init",
"start",
"file",
"cache",
"invalid"
};
typedef enum { typedef enum {
SYNC_STATUS_BROADCAST, SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP, SYNC_STATUS_BROADCAST_RSP,
...@@ -282,7 +290,7 @@ void syncStop(int64_t rid) { ...@@ -282,7 +290,7 @@ void syncStop(int64_t rid) {
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pPeer) syncRemovePeer(pPeer); if (pPeer) syncRemovePeer(pPeer);
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
taosRemoveRef(tsSyncRefId, rid); taosRemoveRef(tsSyncRefId, rid);
...@@ -350,7 +358,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -350,7 +358,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
...@@ -423,7 +431,7 @@ void syncRecover(int64_t rid) { ...@@ -423,7 +431,7 @@ void syncRecover(int64_t rid) {
} }
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
taosReleaseRef(tsSyncRefId, rid); taosReleaseRef(tsSyncRefId, rid);
} }
...@@ -498,6 +506,8 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) { ...@@ -498,6 +506,8 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) {
} }
static void syncClosePeerConn(SSyncPeer *pPeer) { static void syncClosePeerConn(SSyncPeer *pPeer) {
sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd);
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
...@@ -751,7 +761,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new ...@@ -751,7 +761,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
sDebug("vgId:%d, choose master", pNode->vgId); sDebug("vgId:%d, choose master", pNode->vgId);
syncChooseMaster(pNode); syncChooseMaster(pNode);
} else { } else {
sDebug("vgId:%d, version inconsistent, cannot choose master", pNode->vgId); sDebug("vgId:%d, cannot choose master since roles inconsistent", pNode->vgId);
} }
} }
...@@ -770,11 +780,12 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new ...@@ -770,11 +780,12 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
} }
static void syncRestartPeer(SSyncPeer *pPeer) { static void syncRestartPeer(SSyncPeer *pPeer) {
sDebug("%s, restart peer connection", pPeer->id); sDebug("%s, restart peer connection, last sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
sDebug("%s, peer conn is restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
...@@ -803,7 +814,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { ...@@ -803,7 +814,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
} }
if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) { if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started", pPeer->id); sDebug("%s, sync is already started for sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
return; // already started return; // already started
} }
...@@ -821,7 +832,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { ...@@ -821,7 +832,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
syncDecPeerRef(pPeer); syncDecPeerRef(pPeer);
} else { } else {
pPeer->sstatus = TAOS_SYNC_STATUS_START; pPeer->sstatus = TAOS_SYNC_STATUS_START;
sDebug("%s, thread is created to retrieve data", pPeer->id); sDebug("%s, thread is created to retrieve data, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
} }
} }
...@@ -831,9 +842,10 @@ static void syncNotStarted(void *param, void *tmrId) { ...@@ -831,9 +842,10 @@ static void syncNotStarted(void *param, void *tmrId) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
pPeer->timer = NULL; pPeer->timer = NULL;
sInfo("%s, sync connection is still not up, restart", pPeer->id); pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
} }
static void syncTryRecoverFromMaster(void *param, void *tmrId) { static void syncTryRecoverFromMaster(void *param, void *tmrId) {
...@@ -842,14 +854,14 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { ...@@ -842,14 +854,14 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
syncRecoverFromMaster(pPeer); syncRecoverFromMaster(pPeer);
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
} }
static void syncRecoverFromMaster(SSyncPeer *pPeer) { static void syncRecoverFromMaster(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); sDebug("%s, sync is already started since sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
return; return;
} }
...@@ -877,7 +889,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -877,7 +889,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
} else { } else {
nodeSStatus = TAOS_SYNC_STATUS_START; nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-req is sent", pPeer->id); sInfo("%s, sync-req is sent to peer, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
} }
} }
...@@ -915,7 +927,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -915,7 +927,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead); syncSaveIntoBuffer(pPeer, pHead);
} else { } else {
sError("%s, forward discarded, hver:%" PRIu64, pPeer->id, pHead->version); sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
pHead->version);
} }
} }
} }
...@@ -924,8 +937,10 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { ...@@ -924,8 +937,10 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont; SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg is received, self:%s sver:%" PRIu64 " peer:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, sDebug("%s, status msg is received, self:%s sstatus:%s sver:%" PRIu64 ", peer:%s sver:%" PRIu64
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]); ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role],
pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type], pPeer->peerFd);
pPeer->version = pPeersStatus->version; pPeer->version = pPeersStatus->version;
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
...@@ -982,7 +997,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { ...@@ -982,7 +997,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
} }
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
return code; return code;
} }
...@@ -1014,8 +1029,11 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type ...@@ -1014,8 +1029,11 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen); int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen);
if (retLen == statusMsgLen) { if (retLen == statusMsgLen) {
sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, syncRole[pPeersStatus->role], sDebug("%s, status msg is sent, self:%s sstatus:%s sver:%" PRIu64 ", peer:%s sstatus:%s sver:%" PRIu64
pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]); ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId,
statusType[pPeersStatus->type], pPeer->peerFd);
} else { } else {
sDebug("%s, failed to send status msg, restart", pPeer->id); sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -1048,7 +1066,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1048,7 +1066,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup", pPeer->id); sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d", pPeer->id, connFd, pPeer->syncFd);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
...@@ -1069,7 +1087,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { ...@@ -1069,7 +1087,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
sDebug("%s, check peer connection", pPeer->id); sDebug("%s, check peer connection", pPeer->id);
syncSetupPeerConnection(pPeer); syncSetupPeerConnection(pPeer);
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
} }
static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
...@@ -1135,7 +1153,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1135,7 +1153,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
pPeer->syncFd = connFd; pPeer->syncFd = connFd;
syncCreateRestoreDataThread(pPeer); syncCreateRestoreDataThread(pPeer);
} else { } else {
sDebug("%s, TCP connection is already up, close one", pPeer->id); sDebug("%s, TCP connection is already up(pfd:%d), close one, new pfd:%d sfd:%d", pPeer->id, pPeer->peerFd, connFd,
pPeer->syncFd);
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
...@@ -1145,7 +1164,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1145,7 +1164,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
} }
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
} }
static void syncProcessBrokenLink(void *param) { static void syncProcessBrokenLink(void *param) {
...@@ -1156,14 +1175,14 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1156,14 +1175,14 @@ static void syncProcessBrokenLink(void *param) {
if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd);
pPeer->peerFd = -1; pPeer->peerFd = -1;
if (syncDecPeerRef(pPeer) != 0) { if (syncDecPeerRef(pPeer) != 0) {
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
taosReleaseRef(tsSyncRefId, pNode->rid); taosReleaseRef(tsSyncRefId, pNode->rid);
} }
...@@ -1239,10 +1258,13 @@ static void syncMonitorNodeRole(void *param, void *tmrId) { ...@@ -1239,10 +1258,13 @@ static void syncMonitorNodeRole(void *param, void *tmrId) {
if (index == pNode->selfIndex) continue; if (index == pNode->selfIndex) continue;
SSyncPeer *pPeer = pNode->peerInfo[index]; SSyncPeer *pPeer = pNode->peerInfo[index];
if (pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue; if (/*pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && */ nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue;
if (pPeer->sstatus > TAOS_SYNC_STATUS_INIT || nodeSStatus > TAOS_SYNC_STATUS_INIT) continue; if (/*pPeer->sstatus > TAOS_SYNC_STATUS_INIT || */ nodeSStatus > TAOS_SYNC_STATUS_INIT) continue;
sDebug("%s, check roles since self:%s sstatus:%s, peer:%s sstatus:%s", pPeer->id, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], syncRole[nodeRole], syncStatus[nodeSStatus]);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId()); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId());
break;
} }
pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
...@@ -1271,7 +1293,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1271,7 +1293,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
} }
syncRemoveConfirmedFwdInfo(pNode); syncRemoveConfirmedFwdInfo(pNode);
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
...@@ -1339,7 +1361,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1339,7 +1361,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
} }
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&pNode->mutex);
return code; return code;
} }
......
...@@ -267,7 +267,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -267,7 +267,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
nodeSStatus = TAOS_SYNC_STATUS_FILE; nodeSStatus = TAOS_SYNC_STATUS_FILE;
uint64_t fversion = 0; uint64_t fversion = 0;
sDebug("%s, start to restore file", pPeer->id); sDebug("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion); int32_t code = syncRestoreFile(pPeer, &fversion);
if (code < 0) { if (code < 0) {
sError("%s, failed to restore file", pPeer->id); sError("%s, failed to restore file", pPeer->id);
...@@ -291,7 +291,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -291,7 +291,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
} }
nodeSStatus = TAOS_SYNC_STATUS_CACHE; nodeSStatus = TAOS_SYNC_STATUS_CACHE;
sDebug("%s, start to insert buffered points", pPeer->id); sDebug("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
if (syncProcessBufferedFwd(pPeer) < 0) { if (syncProcessBufferedFwd(pPeer) < 0) {
sError("%s, failed to insert buffered points", pPeer->id); sError("%s, failed to insert buffered points", pPeer->id);
return -1; return -1;
...@@ -327,6 +327,8 @@ void *syncRestoreData(void *param) { ...@@ -327,6 +327,8 @@ void *syncRestoreData(void *param) {
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
nodeSStatus = TAOS_SYNC_STATUS_INIT; nodeSStatus = TAOS_SYNC_STATUS_INIT;
sInfo("%s, sync over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1); __sync_fetch_and_sub(&tsSyncNum, 1);
......
...@@ -318,6 +318,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -318,6 +318,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
if (((event & IN_MODIFY) == 0) || once) { if (((event & IN_MODIFY) == 0) || once) {
if (fversion == 0) { if (fversion == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
sDebug("%s, fversion is 0 then set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
fversion = nodeVersion; // must read data to fversion fversion = nodeVersion; // must read data to fversion
} }
} }
...@@ -416,8 +417,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -416,8 +417,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
} }
if (code == 0) { if (code == 0) {
sInfo("%s, wal retrieve is finished", pPeer->id);
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
SWalHead walHead; SWalHead walHead;
memset(&walHead, 0, sizeof(walHead)); memset(&walHead, 0, sizeof(walHead));
code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
...@@ -445,7 +447,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -445,7 +447,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
pPeer->sversion = 0; pPeer->sversion = 0;
pPeer->sstatus = TAOS_SYNC_STATUS_FILE; pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
sInfo("%s, start to retrieve file", pPeer->id); sInfo("%s, start to retrieve file, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
if (syncRetrieveFile(pPeer) < 0) { if (syncRetrieveFile(pPeer) < 0) {
sError("%s, failed to retrieve file", pPeer->id); sError("%s, failed to retrieve file", pPeer->id);
return -1; return -1;
......
...@@ -107,7 +107,7 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { ...@@ -107,7 +107,7 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
while (nleft > 0) { while (nleft > 0) {
nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft); nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
if (nwritten <= 0) { if (nwritten <= 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
continue; continue;
else else
return -1; return -1;
...@@ -133,7 +133,7 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { ...@@ -133,7 +133,7 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
if (nread == 0) { if (nread == 0) {
break; break;
} else if (nread < 0) { } else if (nread < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
continue; continue;
} else { } else {
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册