From 0a90950bdfaddda5133ecd33596e6b7440105b26 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 Nov 2020 10:08:15 +0000 Subject: [PATCH] TD-2157 --- src/sync/inc/syncInt.h | 1 + src/sync/src/syncMain.c | 78 ++++++++++++++++++++++++------------- src/sync/src/syncRestore.c | 6 ++- src/sync/src/syncRetrieve.c | 6 ++- src/util/src/tsocket.c | 4 +- 5 files changed, 61 insertions(+), 34 deletions(-) diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index aacd72e930..7dbb1985d4 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -172,6 +172,7 @@ typedef struct SSyncNode { // sync module global extern int32_t tsSyncNum; extern char tsNodeFqdn[TSDB_FQDN_LEN]; +extern char * syncStatus[]; void *syncRetrieveData(void *param); void *syncRestoreData(void *param); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index dd759ed9d4..a68f0b7bbb 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -73,6 +73,14 @@ char* syncRole[] = { "master" }; +char *syncStatus[] = { + "init", + "start", + "file", + "cache", + "invalid" +}; + typedef enum { SYNC_STATUS_BROADCAST, SYNC_STATUS_BROADCAST_RSP, @@ -282,7 +290,7 @@ void syncStop(int64_t rid) { pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; if (pPeer) syncRemovePeer(pPeer); - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); taosReleaseRef(tsSyncRefId, rid); taosRemoveRef(tsSyncRefId, rid); @@ -350,7 +358,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { (*pNode->notifyRole)(pNode->vgId, nodeRole); } - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); syncBroadcastStatus(pNode); @@ -423,7 +431,7 @@ void syncRecover(int64_t rid) { } } - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); taosReleaseRef(tsSyncRefId, rid); } @@ -498,6 +506,8 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) { } static void syncClosePeerConn(SSyncPeer *pPeer) { + sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd); + taosTmrStopA(&pPeer->timer); taosClose(pPeer->syncFd); if (pPeer->peerFd >= 0) { @@ -751,7 +761,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new sDebug("vgId:%d, choose master", pNode->vgId); syncChooseMaster(pNode); } else { - sDebug("vgId:%d, version inconsistent, cannot choose master", pNode->vgId); + sDebug("vgId:%d, cannot choose master since roles inconsistent", pNode->vgId); } } @@ -770,11 +780,12 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new } static void syncRestartPeer(SSyncPeer *pPeer) { - sDebug("%s, restart peer connection", pPeer->id); + sDebug("%s, restart peer connection, last sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); syncClosePeerConn(pPeer); pPeer->sstatus = TAOS_SYNC_STATUS_INIT; + sDebug("%s, peer conn is restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { @@ -803,7 +814,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { } if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) { - sDebug("%s, sync is already started", pPeer->id); + sDebug("%s, sync is already started for sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); return; // already started } @@ -821,7 +832,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { syncDecPeerRef(pPeer); } else { pPeer->sstatus = TAOS_SYNC_STATUS_START; - sDebug("%s, thread is created to retrieve data", pPeer->id); + sDebug("%s, thread is created to retrieve data, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); } } @@ -831,9 +842,10 @@ static void syncNotStarted(void *param, void *tmrId) { pthread_mutex_lock(&pNode->mutex); pPeer->timer = NULL; - sInfo("%s, sync connection is still not up, restart", pPeer->id); + pPeer->sstatus = TAOS_SYNC_STATUS_INIT; + sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); syncRestartConnection(pPeer); - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); } static void syncTryRecoverFromMaster(void *param, void *tmrId) { @@ -842,14 +854,14 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { pthread_mutex_lock(&pNode->mutex); syncRecoverFromMaster(pPeer); - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); } static void syncRecoverFromMaster(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { - sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); + sDebug("%s, sync is already started since sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); return; } @@ -877,7 +889,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { sError("%s, failed to send sync-req to peer", pPeer->id); } else { nodeSStatus = TAOS_SYNC_STATUS_START; - sInfo("%s, sync-req is sent", pPeer->id); + sInfo("%s, sync-req is sent to peer, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); } } @@ -915,7 +927,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); } else { - sError("%s, forward discarded, hver:%" PRIu64, pPeer->id, pHead->version); + sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus], + pHead->version); } } } @@ -924,8 +937,10 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus *pPeersStatus = (SPeersStatus *)cont; - sDebug("%s, status msg is received, self:%s sver:%" PRIu64 " peer:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, - syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]); + sDebug("%s, status msg is received, self:%s sstatus:%s sver:%" PRIu64 ", peer:%s sver:%" PRIu64 + ", ack:%d tranId:%u type:%s pfd:%d", + pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role], + pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type], pPeer->peerFd); pPeer->version = pPeersStatus->version; syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); @@ -982,7 +997,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { } } - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); return code; } @@ -1014,8 +1029,11 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen); if (retLen == statusMsgLen) { - sDebug("%s, status msg is sent, self:%s sver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, syncRole[pPeersStatus->role], - pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]); + sDebug("%s, status msg is sent, self:%s sstatus:%s sver:%" PRIu64 ", peer:%s sstatus:%s sver:%" PRIu64 + ", ack:%d tranId:%u type:%s pfd:%d", + pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], + syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, + statusType[pPeersStatus->type], pPeer->peerFd); } else { sDebug("%s, failed to send status msg, restart", pPeer->id); syncRestartConnection(pPeer); @@ -1048,7 +1066,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { - sDebug("%s, connection to peer server is setup", pPeer->id); + sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d", pPeer->id, connFd, pPeer->syncFd); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); @@ -1069,7 +1087,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { sDebug("%s, check peer connection", pPeer->id); syncSetupPeerConnection(pPeer); - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); } static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { @@ -1135,7 +1153,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { pPeer->syncFd = connFd; syncCreateRestoreDataThread(pPeer); } else { - sDebug("%s, TCP connection is already up, close one", pPeer->id); + sDebug("%s, TCP connection is already up(pfd:%d), close one, new pfd:%d sfd:%d", pPeer->id, pPeer->peerFd, connFd, + pPeer->syncFd); syncClosePeerConn(pPeer); pPeer->peerFd = connFd; pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); @@ -1145,7 +1164,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { } } - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); } static void syncProcessBrokenLink(void *param) { @@ -1156,14 +1175,14 @@ static void syncProcessBrokenLink(void *param) { if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; pthread_mutex_lock(&pNode->mutex); - sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno)); + sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd); pPeer->peerFd = -1; if (syncDecPeerRef(pPeer) != 0) { syncRestartConnection(pPeer); } - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); taosReleaseRef(tsSyncRefId, pNode->rid); } @@ -1239,10 +1258,13 @@ static void syncMonitorNodeRole(void *param, void *tmrId) { if (index == pNode->selfIndex) continue; SSyncPeer *pPeer = pNode->peerInfo[index]; - if (pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue; - if (pPeer->sstatus > TAOS_SYNC_STATUS_INIT || nodeSStatus > TAOS_SYNC_STATUS_INIT) continue; + if (/*pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && */ nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue; + if (/*pPeer->sstatus > TAOS_SYNC_STATUS_INIT || */ nodeSStatus > TAOS_SYNC_STATUS_INIT) continue; + sDebug("%s, check roles since self:%s sstatus:%s, peer:%s sstatus:%s", pPeer->id, syncRole[pPeer->role], + syncStatus[pPeer->sstatus], syncRole[nodeRole], syncStatus[nodeSStatus]); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId()); + break; } pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); @@ -1271,7 +1293,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { } syncRemoveConfirmedFwdInfo(pNode); - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); } pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); @@ -1339,7 +1361,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle } } - pthread_mutex_unlock(&(pNode->mutex)); + pthread_mutex_unlock(&pNode->mutex); return code; } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index b31ec5f7c7..466c39060a 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -267,7 +267,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { nodeSStatus = TAOS_SYNC_STATUS_FILE; uint64_t fversion = 0; - sDebug("%s, start to restore file", pPeer->id); + sDebug("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); int32_t code = syncRestoreFile(pPeer, &fversion); if (code < 0) { sError("%s, failed to restore file", pPeer->id); @@ -291,7 +291,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { } nodeSStatus = TAOS_SYNC_STATUS_CACHE; - sDebug("%s, start to insert buffered points", pPeer->id); + sDebug("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); if (syncProcessBufferedFwd(pPeer) < 0) { sError("%s, failed to insert buffered points", pPeer->id); return -1; @@ -327,6 +327,8 @@ void *syncRestoreData(void *param) { (*pNode->notifyRole)(pNode->vgId, nodeRole); nodeSStatus = TAOS_SYNC_STATUS_INIT; + sInfo("%s, sync over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); + taosClose(pPeer->syncFd); syncCloseRecvBuffer(pNode); __sync_fetch_and_sub(&tsSyncNum, 1); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index b6dacaa262..04d4ce32cb 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -318,6 +318,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) if (((event & IN_MODIFY) == 0) || once) { if (fversion == 0) { pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt + sDebug("%s, fversion is 0 then set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); fversion = nodeVersion; // must read data to fversion } } @@ -416,8 +417,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { } if (code == 0) { - sInfo("%s, wal retrieve is finished", pPeer->id); pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; + sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + SWalHead walHead; memset(&walHead, 0, sizeof(walHead)); code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); @@ -445,7 +447,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { pPeer->sversion = 0; pPeer->sstatus = TAOS_SYNC_STATUS_FILE; - sInfo("%s, start to retrieve file", pPeer->id); + sInfo("%s, start to retrieve file, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); if (syncRetrieveFile(pPeer) < 0) { sError("%s, failed to retrieve file", pPeer->id); return -1; diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 1be79b7bbd..2543c81708 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -107,7 +107,7 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { while (nleft > 0) { nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft); if (nwritten <= 0) { - if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */) continue; else return -1; @@ -133,7 +133,7 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { if (nread == 0) { break; } else if (nread < 0) { - if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) { continue; } else { return -1; -- GitLab