diff --git a/src/os/inc/osSocket.h b/src/os/inc/osSocket.h index a172c0bf3465257fe5590ae0a7552cf699a61c10..63498596113aea603b41cd53fdde03f44d021ce3 100644 --- a/src/os/inc/osSocket.h +++ b/src/os/inc/osSocket.h @@ -15,7 +15,6 @@ #ifndef TDENGINE_OS_SOCKET_H #define TDENGINE_OS_SOCKET_H - #ifdef __cplusplus extern "C" { #endif @@ -33,13 +32,17 @@ extern "C" { #define taosReadSocket(fd, buf, len) read(fd, buf, len) #define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosCloseSocketNoCheck(x) close(x) - #define taosCloseSocket(x) \ - { \ - if (FD_VALID(x)) { \ - close(x); \ - x = FD_INITIALIZER; \ - } \ - } +static FORCE_INLINE int32_t taosCloseSocket(int x) { + int32_t ret = 0; + if (x > STDERR_FILENO) { + ret = close(x); + x = ((int32_t)-1); + } + if (ret != 0) { + assert(false); + } + return ret; +} #endif #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index e656193e1fb3510f34cb271676c0f57790073e47..10cff04494151eb4106bd48edeec4c5e8147a228 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -271,7 +271,11 @@ void syncStop(int64_t rid) { sInfo("vgId:%d, cleanup sync", pNode->vgId); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t)); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); @@ -285,7 +289,10 @@ void syncStop(int64_t rid) { pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; if (pPeer) syncRemovePeer(pPeer); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleaseNode(pNode); taosRemoveRef(tsNodeRefId, rid); @@ -300,7 +307,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb for (int32_t syn_index = 0; syn_index < pNode->replica; ++syn_index) { @@ -368,7 +379,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { syncStartCheckPeerConn(pNode->peerInfo[syn_index]); } - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); syncBroadcastStatus(pNode); @@ -418,7 +432,11 @@ void syncRecover(int64_t rid) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; nodeVersion = 0; @@ -431,7 +449,10 @@ void syncRecover(int64_t rid) { } } - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleaseNode(pNode); } @@ -545,7 +566,10 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd); taosTmrStopA(&pPeer->timer); - taosCloseSocket(pPeer->syncFd); + + if (taosCloseSocket(pPeer->syncFd) != 0) { + exit(false); + } if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; void *pConn = pPeer->pConn; @@ -871,7 +895,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) { int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, check peer connection in 1000 ms", pPeer->id); - taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); + if (!taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer)) { + assert(false); + } } } @@ -930,12 +956,20 @@ static void syncNotStarted(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; pPeer->timer = NULL; pPeer->sstatus = TAOS_SYNC_STATUS_INIT; sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); syncRestartConnection(pPeer); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -947,9 +981,17 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncRecoverFromMaster(pPeer); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -1078,8 +1120,12 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { SSyncHead *pHead = buffer; SSyncNode *pNode = pPeer->pSyncNode; - - pthread_mutex_lock(&pNode->mutex); + + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; int32_t code = syncReadPeerMsg(pPeer, pHead); @@ -1095,7 +1141,10 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { } } - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); return code; @@ -1188,12 +1237,19 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; sDebug("%s, check peer connection", pPeer->id); syncSetupPeerConnection(pPeer); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -1274,7 +1330,12 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { sDebug("vgId:%d, sync connection is incoming, tranId:%u", vgId, msg.tranId); SSyncNode *pNode = *ppNode; - pthread_mutex_lock(&pNode->mutex); + + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { @@ -1305,7 +1366,10 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { } } - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; } static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { @@ -1314,7 +1378,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { SSyncNode *pNode = pPeer->pSyncNode; - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp); @@ -1324,7 +1392,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { } syncRestartConnection(pPeer); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -1428,7 +1500,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { int64_t syn_time = taosGetTimestampMs(); if (pSyncFwds->fwds > 0) { - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; if (ABS(syn_time - pFwdInfo->time) < 10000) break; @@ -1439,7 +1515,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { } syncRemoveConfirmedFwdInfo(pNode); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; } pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); @@ -1485,7 +1565,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len); fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; @@ -1497,7 +1581,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (code >= 0) { code = 1; } else { - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; return code; } } @@ -1513,7 +1600,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle } } - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; return code; } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index bf9d5201a062e048365c2887b7b6a6d70b40547f..f8b79a1088b95f26e7c6bc0e768af05eb660e5b2 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -144,7 +144,11 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { forwards++; } - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; while (forwards < pRecv->forwards && pRecv->code == 0) { offset = syncProcessOneBufferedFwd(pPeer, offset); @@ -154,7 +158,10 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { nodeRole = TAOS_SYNC_ROLE_SLAVE; sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; return pRecv->code; } @@ -286,7 +293,7 @@ void *syncRestoreData(void *param) { } else { if (syncRestoreDataStepByStep(pPeer) == 0) { sInfo("%s, it is synced successfully", pPeer->id); - nodeRole = TAOS_SYNC_ROLE_SLAVE; + nodeRole = TAOS_SYNC_ROLE_SLAVE; syncBroadcastStatus(pNode); } else { sError("%s, failed to restore data, restart connection", pPeer->id); @@ -310,3 +317,4 @@ void *syncRestoreData(void *param) { return NULL; } + \ No newline at end of file diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index ccb0a67e5ca99441a1c3026fee498c36795518c5..8ca5b63ba17ff04cd8c64e0ed1d47b414c83ed5a 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -168,7 +168,9 @@ void syncFreeTcpConn(void *param) { sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); pConn->closedByApp = 1; - shutdown(pConn->fd, SHUT_WR); + if (shutdown(pConn->fd, SHUT_WR) != 0) { + ASSERT(false); + } } static void taosProcessBrokenLink(SConnObj *pConn) { diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 865e1159c1995b2796682d64ee06de02442b7a25..f9ba6be63e90fcca3e632933d7e5433102031a19 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -445,6 +445,7 @@ bool taosTmrStop(tmr_h timerId) { bool taosTmrStopA(tmr_h* timerId) { bool ret = taosTmrStop(*timerId); *timerId = NULL; + return ret; }