提交 af84d8db 编写于 作者: C Cary Xu

update

上级 7dc3f8e0
......@@ -38,8 +38,6 @@ static void * tsVgIdHash = NULL;
static int32_t tsNodeRefId = -1;
static int32_t tsPeerRefId = -1;
uint64_t gSyncUid = 0;
// local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer);
......@@ -273,9 +271,11 @@ void syncStop(int64_t rid) {
sInfo("vgId:%d, cleanup sync", pNode->vgId);
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
if (tsVgIdHash) taosHashRemove(tsVgIdHash, &pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
......@@ -289,8 +289,10 @@ void syncStop(int64_t rid) {
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pPeer) syncRemovePeer(pPeer);
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncReleaseNode(pNode);
taosRemoveRef(tsNodeRefId, rid);
......@@ -305,9 +307,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica);
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncStopCheckPeerConn(pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]); // arb
for (int32_t syn_index = 0; syn_index < pNode->replica; ++syn_index) {
......@@ -375,8 +379,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
syncStartCheckPeerConn(pNode->peerInfo[syn_index]);
}
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum);
syncBroadcastStatus(pNode);
......@@ -426,9 +432,11 @@ void syncRecover(int64_t rid) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
nodeVersion = 0;
......@@ -441,8 +449,10 @@ void syncRecover(int64_t rid) {
}
}
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncReleaseNode(pNode);
}
......@@ -941,15 +951,20 @@ static void syncNotStarted(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
pPeer->timer = NULL;
pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
syncRestartConnection(pPeer);
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncReleasePeer(pPeer);
}
......@@ -961,12 +976,17 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncRecoverFromMaster(pPeer);
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncReleasePeer(pPeer);
}
......@@ -1096,9 +1116,11 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
SSyncHead *pHead = buffer;
SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
int32_t code = syncReadPeerMsg(pPeer, pHead);
......@@ -1114,8 +1136,10 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
}
}
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncReleasePeer(pPeer);
return code;
......@@ -1208,15 +1232,19 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
sDebug("%s, check peer connection", pPeer->id);
syncSetupPeerConnection(pPeer);
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncReleasePeer(pPeer);
}
......@@ -1297,10 +1325,12 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
sDebug("vgId:%d, sync connection is incoming, tranId:%u", vgId, msg.tranId);
SSyncNode *pNode = *ppNode;
int64_t rid = -9999;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) {
......@@ -1331,8 +1361,10 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
}
}
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
}
static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
......@@ -1341,9 +1373,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SSyncNode *pNode = pPeer->pSyncNode;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d",
pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp);
......@@ -1353,8 +1387,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
}
syncRestartConnection(pPeer);
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
syncReleasePeer(pPeer);
}
......@@ -1458,9 +1495,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
int64_t syn_time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) {
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
if (ABS(syn_time - pFwdInfo->time) < 10000) break;
......@@ -1471,8 +1510,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
}
syncRemoveConfirmedFwdInfo(pNode);
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
}
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
......@@ -1518,10 +1560,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len);
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
int64_t rid = -9998;
uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
......@@ -1533,8 +1576,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (code >= 0) {
code = 1;
} else {
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
return code;
}
}
......@@ -1550,8 +1595,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
}
}
printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
return code;
}
......@@ -144,9 +144,11 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
forwards++;
}
// uint64_t uid = atomic_add_fetch_32(&gSyncUid, 1);
// printf("propSync_%s_%d rid:%" PRId64 ":uid:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_lock(&pNode->mutex);
int ret = 0;
if ((ret = pthread_mutex_lock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
while (forwards < pRecv->forwards && pRecv->code == 0) {
offset = syncProcessOneBufferedFwd(pPeer, offset);
......@@ -156,8 +158,10 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
nodeRole = TAOS_SYNC_ROLE_SLAVE;
sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards);
// printf("propSync_%s_%d rid:%" PRId64 ":uid_ex:%" PRIu64 "\n", __func__, __LINE__, rid, uid);
pthread_mutex_unlock(&pNode->mutex);
if ((ret = pthread_mutex_unlock(&pNode->mutex)) != 0) {
sFatal("%d:: vgId:%d, failed to lock pNode->mutex", __LINE__, pNode->vgId);
exit(ret);
};
return pRecv->code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册