diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index d80f9f8cd5f24a0bf7f481a2a6949a8e21872aaa..e9c66adaac6c6d70ecbbf3ef59079d344a96099e 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -38,8 +38,6 @@ static void * tsVgIdHash = NULL; static int32_t tsNodeRefId = -1; static int32_t tsPeerRefId = -1; -uint64_t gSyncUid = 0; - // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer); @@ -273,9 +271,11 @@ void syncStop(int64_t rid) { sInfo("vgId:%d, cleanup sync", pNode->vgId); - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t)); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); @@ -289,8 +289,10 @@ void syncStop(int64_t rid) { pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; if (pPeer) syncRemovePeer(pPeer); - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleaseNode(pNode); taosRemoveRef(tsNodeRefId, rid); @@ -305,9 +307,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb for (int32_t syn_index = 0; syn_index < pNode->replica; ++syn_index) { @@ -375,8 +379,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { syncStartCheckPeerConn(pNode->peerInfo[syn_index]); } - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); syncBroadcastStatus(pNode); @@ -426,9 +432,11 @@ void syncRecover(int64_t rid) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; nodeVersion = 0; @@ -441,8 +449,10 @@ void syncRecover(int64_t rid) { } } - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleaseNode(pNode); } @@ -941,15 +951,20 @@ static void syncNotStarted(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; pPeer->timer = NULL; pPeer->sstatus = TAOS_SYNC_STATUS_INIT; sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); syncRestartConnection(pPeer); - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -961,12 +976,17 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncRecoverFromMaster(pPeer); - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -1096,9 +1116,11 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { SSyncHead *pHead = buffer; SSyncNode *pNode = pPeer->pSyncNode; - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; int32_t code = syncReadPeerMsg(pPeer, pHead); @@ -1114,8 +1136,10 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { } } - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); return code; @@ -1208,15 +1232,19 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; sDebug("%s, check peer connection", pPeer->id); syncSetupPeerConnection(pPeer); - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -1297,10 +1325,12 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { sDebug("vgId:%d, sync connection is incoming, tranId:%u", vgId, msg.tranId); SSyncNode *pNode = *ppNode; - int64_t rid = -9999; - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { @@ -1331,8 +1361,10 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { } } - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; } static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { @@ -1341,9 +1373,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { SSyncNode *pNode = pPeer->pSyncNode; - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp); @@ -1353,8 +1387,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { } syncRestartConnection(pPeer); - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; syncReleasePeer(pPeer); } @@ -1458,9 +1495,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { int64_t syn_time = taosGetTimestampMs(); if (pSyncFwds->fwds > 0) { - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; if (ABS(syn_time - pFwdInfo->time) < 10000) break; @@ -1471,8 +1510,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { } syncRemoveConfirmedFwdInfo(pNode); - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; } pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); @@ -1518,10 +1560,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len); fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head - int64_t rid = -9998; - uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; @@ -1533,8 +1576,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (code >= 0) { code = 1; } else { - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; return code; } } @@ -1550,8 +1595,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle } } - printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; return code; } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 79f44438433e710d272b3ebf6accd8793ee3ba35..f8b79a1088b95f26e7c6bc0e768af05eb660e5b2 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -144,9 +144,11 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { forwards++; } - // uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); - // printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_lock(&pNode->mutex); + int ret = 0; + if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; while (forwards < pRecv->forwards && pRecv->code == 0) { offset = syncProcessOneBufferedFwd(pPeer, offset); @@ -156,8 +158,10 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { nodeRole = TAOS_SYNC_ROLE_SLAVE; sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards); - // printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid); - pthread_mutex_unlock(&pNode->mutex); + if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) { + sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId); + exit(ret); + }; return pRecv->code; }