diff --git a/src/os/inc/osSocket.h b/src/os/inc/osSocket.h index 63498596113aea603b41cd53fdde03f44d021ce3..52ea4888011b6150b947198b6a47ac611ae02846 100644 --- a/src/os/inc/osSocket.h +++ b/src/os/inc/osSocket.h @@ -15,6 +15,7 @@ #ifndef TDENGINE_OS_SOCKET_H #define TDENGINE_OS_SOCKET_H + #ifdef __cplusplus extern "C" { #endif @@ -32,17 +33,13 @@ extern "C" { #define taosReadSocket(fd, buf, len) read(fd, buf, len) #define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosCloseSocketNoCheck(x) close(x) -static FORCE_INLINE int32_t taosCloseSocket(int x) { - int32_t ret = 0; - if (x > STDERR_FILENO) { - ret = close(x); - x = ((int32_t)-1); - } - if (ret != 0) { - assert(false); +#define taosCloseSocket(x) \ + { \ + if (FD_VALID(x)) { \ + close(x); \ + x = FD_INITIALIZER; \ + } \ } - return ret; -} #endif #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 10cff04494151eb4106bd48edeec4c5e8147a228..e656193e1fb3510f34cb271676c0f57790073e47 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -271,11 +271,7 @@ void syncStop(int64_t rid) { sInfo("vgId:%d, cleanup sync", pNode->vgId); - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t)); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); @@ -289,10 +285,7 @@ void syncStop(int64_t rid) { pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; if (pPeer) syncRemovePeer(pPeer); - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); syncReleaseNode(pNode); taosRemoveRef(tsNodeRefId, rid); @@ -307,11 +300,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb for (int32_t syn_index = 0; syn_index < pNode->replica; ++syn_index) { @@ -379,10 +368,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { syncStartCheckPeerConn(pNode->peerInfo[syn_index]); } - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); syncBroadcastStatus(pNode); @@ -432,11 +418,7 @@ void syncRecover(int64_t rid) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); nodeVersion = 0; @@ -449,10 +431,7 @@ void syncRecover(int64_t rid) { } } - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); syncReleaseNode(pNode); } @@ -566,10 +545,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd); taosTmrStopA(&pPeer->timer); - - if (taosCloseSocket(pPeer->syncFd) != 0) { - exit(false); - } + taosCloseSocket(pPeer->syncFd); if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; void *pConn = pPeer->pConn; @@ -895,9 +871,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) { int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, check peer connection in 1000 ms", pPeer->id); - if (!taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer)) { - assert(false); - } + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); } } @@ -956,20 +930,12 @@ static void syncNotStarted(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); pPeer->timer = NULL; pPeer->sstatus = TAOS_SYNC_STATUS_INIT; sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); syncRestartConnection(pPeer); - - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); } @@ -981,17 +947,9 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); syncRecoverFromMaster(pPeer); - - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); } @@ -1120,12 +1078,8 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { SSyncHead *pHead = buffer; SSyncNode *pNode = pPeer->pSyncNode; - - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + + pthread_mutex_lock(&pNode->mutex); int32_t code = syncReadPeerMsg(pPeer, pHead); @@ -1141,10 +1095,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { } } - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); return code; @@ -1237,19 +1188,12 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); sDebug("%s, check peer connection", pPeer->id); syncSetupPeerConnection(pPeer); - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); } @@ -1330,12 +1274,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { sDebug("vgId:%d, sync connection is incoming, tranId:%u", vgId, msg.tranId); SSyncNode *pNode = *ppNode; - - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { @@ -1366,10 +1305,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { } } - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); } static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { @@ -1378,11 +1314,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { SSyncNode *pNode = pPeer->pSyncNode; - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp); @@ -1392,11 +1324,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { } syncRestartConnection(pPeer); - - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); } @@ -1500,11 +1428,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { int64_t syn_time = taosGetTimestampMs(); if (pSyncFwds->fwds > 0) { - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; if (ABS(syn_time - pFwdInfo->time) < 10000) break; @@ -1515,11 +1439,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { } syncRemoveConfirmedFwdInfo(pNode); - - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); } pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); @@ -1565,11 +1485,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len); fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; @@ -1581,10 +1497,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (code >= 0) { code = 1; } else { - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); return code; } } @@ -1600,10 +1513,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle } } - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); return code; } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index f8b79a1088b95f26e7c6bc0e768af05eb660e5b2..bf9d5201a062e048365c2887b7b6a6d70b40547f 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -144,11 +144,7 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { forwards++; } - int ret = 0; - if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_lock(&pNode->mutex); while (forwards < pRecv->forwards && pRecv->code == 0) { offset = syncProcessOneBufferedFwd(pPeer, offset); @@ -158,10 +154,7 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { nodeRole = TAOS_SYNC_ROLE_SLAVE; sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards); - if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { - sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); - exit(ret); - }; + pthread_mutex_unlock(&pNode->mutex); return pRecv->code; } @@ -293,7 +286,7 @@ void *syncRestoreData(void *param) { } else { if (syncRestoreDataStepByStep(pPeer) == 0) { sInfo("%s, it is synced successfully", pPeer->id); - nodeRole = TAOS_SYNC_ROLE_SLAVE; + nodeRole = TAOS_SYNC_ROLE_SLAVE; syncBroadcastStatus(pNode); } else { sError("%s, failed to restore data, restart connection", pPeer->id); @@ -317,4 +310,3 @@ void *syncRestoreData(void *param) { return NULL; } - \ No newline at end of file diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 8ca5b63ba17ff04cd8c64e0ed1d47b414c83ed5a..ccb0a67e5ca99441a1c3026fee498c36795518c5 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -168,9 +168,7 @@ void syncFreeTcpConn(void *param) { sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); pConn->closedByApp = 1; - if (shutdown(pConn->fd, SHUT_WR) != 0) { - ASSERT(false); - } + shutdown(pConn->fd, SHUT_WR); } static void taosProcessBrokenLink(SConnObj *pConn) { diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index f9ba6be63e90fcca3e632933d7e5433102031a19..865e1159c1995b2796682d64ee06de02442b7a25 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -445,7 +445,6 @@ bool taosTmrStop(tmr_h timerId) { bool taosTmrStopA(tmr_h* timerId) { bool ret = taosTmrStop(*timerId); *timerId = NULL; - return ret; }