diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index e656193e1fb3510f34cb271676c0f57790073e47..c095c0c72b0832157a06adfb47b2064cf3cc2c40 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -38,6 +38,8 @@ static void * tsVgIdHash = NULL; static int32_t tsNodeRefId = -1; static int32_t tsPeerRefId = -1; +uint64_t gSyncUid = 0; + // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncRecoverFromMaster(SSyncPeer *pPeer); @@ -271,6 +273,8 @@ void syncStop(int64_t rid) { sInfo("vgId:%d, cleanup sync", pNode->vgId); + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t)); @@ -285,6 +289,7 @@ void syncStop(int64_t rid) { pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; if (pPeer) syncRemovePeer(pPeer); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); syncReleaseNode(pNode); @@ -300,6 +305,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb @@ -368,6 +375,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { syncStartCheckPeerConn(pNode->peerInfo[syn_index]); } + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); @@ -418,6 +426,8 @@ void syncRecover(int64_t rid) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; (*pNode->notifyRoleFp)(pNode->vgId, nodeRole); + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); nodeVersion = 0; @@ -431,6 +441,7 @@ void syncRecover(int64_t rid) { } } + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); syncReleaseNode(pNode); @@ -930,11 +941,14 @@ static void syncNotStarted(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); pPeer->timer = NULL; pPeer->sstatus = TAOS_SYNC_STATUS_INIT; sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); syncRestartConnection(pPeer); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); @@ -947,8 +961,11 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); syncRecoverFromMaster(pPeer); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); @@ -1078,7 +1095,9 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { SSyncHead *pHead = buffer; SSyncNode *pNode = pPeer->pSyncNode; - + + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); int32_t code = syncReadPeerMsg(pPeer, pHead); @@ -1095,6 +1114,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { } } + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); @@ -1188,11 +1208,14 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncNode *pNode = pPeer->pSyncNode; + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); sDebug("%s, check peer connection", pPeer->id); syncSetupPeerConnection(pPeer); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); @@ -1274,6 +1297,9 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { sDebug("vgId:%d, sync connection is incoming, tranId:%u", vgId, msg.tranId); SSyncNode *pNode = *ppNode; + int64_t rid = -9999; + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); SSyncPeer *pPeer; @@ -1305,6 +1331,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { } } + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); } @@ -1314,6 +1341,8 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { SSyncNode *pNode = pPeer->pSyncNode; + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d", @@ -1324,6 +1353,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { } syncRestartConnection(pPeer); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); syncReleasePeer(pPeer); @@ -1428,6 +1458,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { int64_t syn_time = taosGetTimestampMs(); if (pSyncFwds->fwds > 0) { + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; @@ -1439,7 +1471,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { } syncRemoveConfirmedFwdInfo(pNode); - pthread_mutex_unlock(&pNode->mutex); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); + pthread_mutex_unlock(&pNode->mutex); } pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); @@ -1485,6 +1518,9 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len); fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head + int64_t rid = -9998; + uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pNode->replica; ++i) { @@ -1497,6 +1533,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (code >= 0) { code = 1; } else { + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); return code; } @@ -1513,6 +1550,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle } } + sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); pthread_mutex_unlock(&pNode->mutex); return code; diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index bf9d5201a062e048365c2887b7b6a6d70b40547f..3d015501859ac15b1fa2431eaf13442ee7d02675 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -144,6 +144,8 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { forwards++; } + // uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); + // sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_lock(&pNode->mutex); while (forwards < pRecv->forwards && pRecv->code == 0) { @@ -154,6 +156,7 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { nodeRole = TAOS_SYNC_ROLE_SLAVE; sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards); + // sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64, __func__, __LINE__, rid, uid); pthread_mutex_unlock(&pNode->mutex); return pRecv->code; @@ -286,7 +289,7 @@ void *syncRestoreData(void *param) { } else { if (syncRestoreDataStepByStep(pPeer) == 0) { sInfo("%s, it is synced successfully", pPeer->id); - nodeRole = TAOS_SYNC_ROLE_SLAVE; + nodeRole = TAOS_SYNC_ROLE_SLAVE; syncBroadcastStatus(pNode); } else { sError("%s, failed to restore data, restart connection", pPeer->id); @@ -310,3 +313,4 @@ void *syncRestoreData(void *param) { return NULL; } + \ No newline at end of file