提交 7dc3f8e0 编写于 作者: C Cary Xu

update

上级 eee026eb
...@@ -274,7 +274,7 @@ void syncStop(int64_t rid) { ...@@ -274,7 +274,7 @@ void syncStop(int64_t rid) {
sInfo("vgId:%d, cleanup sync", pNode->vgId); sInfo("vgId:%d, cleanup sync", pNode->vgId);
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t)); if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t));
...@@ -289,7 +289,7 @@ void syncStop(int64_t rid) { ...@@ -289,7 +289,7 @@ void syncStop(int64_t rid) {
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pPeer) syncRemovePeer(pPeer); if (pPeer) syncRemovePeer(pPeer);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
syncReleaseNode(pNode); syncReleaseNode(pNode);
...@@ -306,7 +306,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -306,7 +306,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
pNode->replica); pNode->replica);
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb
...@@ -375,7 +375,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -375,7 +375,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
syncStartCheckPeerConn(pNode->peerInfo[syn_index]); syncStartCheckPeerConn(pNode->peerInfo[syn_index]);
} }
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum);
...@@ -427,7 +427,7 @@ void syncRecover(int64_t rid) { ...@@ -427,7 +427,7 @@ void syncRecover(int64_t rid) {
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole); (*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
nodeVersion = 0; nodeVersion = 0;
...@@ -441,7 +441,7 @@ void syncRecover(int64_t rid) { ...@@ -441,7 +441,7 @@ void syncRecover(int64_t rid) {
} }
} }
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
syncReleaseNode(pNode); syncReleaseNode(pNode);
...@@ -942,13 +942,13 @@ static void syncNotStarted(void *param, void *tmrId) { ...@@ -942,13 +942,13 @@ static void syncNotStarted(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
pPeer->timer = NULL; pPeer->timer = NULL;
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
...@@ -962,10 +962,10 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { ...@@ -962,10 +962,10 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
syncRecoverFromMaster(pPeer); syncRecoverFromMaster(pPeer);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
...@@ -1097,7 +1097,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { ...@@ -1097,7 +1097,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
int32_t code = syncReadPeerMsg(pPeer, pHead); int32_t code = syncReadPeerMsg(pPeer, pHead);
...@@ -1114,7 +1114,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { ...@@ -1114,7 +1114,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
} }
} }
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
...@@ -1209,13 +1209,13 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { ...@@ -1209,13 +1209,13 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
sDebug("%s, check peer connection", pPeer->id); sDebug("%s, check peer connection", pPeer->id);
syncSetupPeerConnection(pPeer); syncSetupPeerConnection(pPeer);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
...@@ -1299,7 +1299,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { ...@@ -1299,7 +1299,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
SSyncNode *pNode = *ppNode; SSyncNode *pNode = *ppNode;
int64_t rid = -9999; int64_t rid = -9999;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
SSyncPeer *pPeer; SSyncPeer *pPeer;
...@@ -1331,7 +1331,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { ...@@ -1331,7 +1331,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
} }
} }
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
} }
...@@ -1342,7 +1342,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { ...@@ -1342,7 +1342,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d", sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d",
...@@ -1353,7 +1353,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { ...@@ -1353,7 +1353,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
} }
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
...@@ -1459,7 +1459,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1459,7 +1459,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
...@@ -1471,8 +1471,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1471,8 +1471,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
} }
syncRemoveConfirmedFwdInfo(pNode); syncRemoveConfirmedFwdInfo(pNode);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
...@@ -1520,7 +1520,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1520,7 +1520,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
int64_t rid = -9998; int64_t rid = -9998;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
...@@ -1533,7 +1533,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1533,7 +1533,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (code >= 0) { if (code >= 0) {
code = 1; code = 1;
} else { } else {
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
return code; return code;
} }
...@@ -1550,7 +1550,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1550,7 +1550,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
} }
} }
sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64,__func__, __LINE__,rid, uid); printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
return code; return code;
......
...@@ -145,7 +145,7 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { ...@@ -145,7 +145,7 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
} }
// uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1); // uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
// sInfo("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64, __func__, __LINE__, rid, uid); // printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
while (forwards < pRecv->forwards && pRecv->code == 0) { while (forwards < pRecv->forwards && pRecv->code == 0) {
...@@ -156,7 +156,7 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { ...@@ -156,7 +156,7 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards); sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards);
// sInfo("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64, __func__, __LINE__, rid, uid); // printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex); pthread_mutex_unlock(&pNode->mutex);
return pRecv->code; return pRecv->code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册