提交 5f09c1f8 编写于 作者: S Shengliang Guan

TD-1715

上级 0e6c237d
...@@ -310,6 +310,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -310,6 +310,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
int code = 0; int code = 0;
if (pNode == NULL) return 0; if (pNode == NULL) return 0;
syncAddNodeRef(pNode);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
...@@ -318,6 +319,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -318,6 +319,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
syncDecNodeRef(pNode);
return TSDB_CODE_SYN_INVALID_VERSION; return TSDB_CODE_SYN_INVALID_VERSION;
} }
...@@ -326,10 +328,16 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -326,10 +328,16 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
qtype, pWalHead->version); qtype, pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) {
syncDecNodeRef(pNode);
return 0;
}
// only pkt from RPC or CQ can be forwarded // only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) {
syncDecNodeRef(pNode);
return 0;
}
// a hacker way to improve the performance // a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
...@@ -361,6 +369,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -361,6 +369,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册