提交 7e5d97f0 编写于 作者: S Shengliang Guan

fix: stop snapshot receiver on sync post close

上级 84ec8a7f
...@@ -232,6 +232,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo); ...@@ -232,6 +232,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid); int32_t syncStart(int64_t rid);
void syncStop(int64_t rid); void syncStop(int64_t rid);
void syncPreStop(int64_t rid); void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
......
...@@ -98,6 +98,7 @@ bool vnodeShouldRollback(SVnode* pVnode); ...@@ -98,6 +98,7 @@ bool vnodeShouldRollback(SVnode* pVnode);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode); int32_t vnodeSyncStart(SVnode* pVnode);
void vnodeSyncPreClose(SVnode* pVnode); void vnodeSyncPreClose(SVnode* pVnode);
void vnodeSyncPostClose(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
......
...@@ -245,9 +245,12 @@ _err: ...@@ -245,9 +245,12 @@ _err:
return NULL; return NULL;
} }
void vnodePreClose(SVnode *pVnode) { vnodeQueryPreClose(pVnode); } void vnodePreClose(SVnode *pVnode) {
vnodeQueryPreClose(pVnode);
vnodeSyncPreClose(pVnode);
}
void vnodePostClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); } void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
......
...@@ -614,6 +614,11 @@ void vnodeSyncPreClose(SVnode *pVnode) { ...@@ -614,6 +614,11 @@ void vnodeSyncPreClose(SVnode *pVnode) {
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
} }
void vnodeSyncPostClose(SVnode *pVnode) {
vInfo("vgId:%d, post close sync", pVnode->config.vgId);
syncPostStop(pVnode->sync);
}
void vnodeSyncClose(SVnode *pVnode) { void vnodeSyncClose(SVnode *pVnode) {
vInfo("vgId:%d, close sync", pVnode->config.vgId); vInfo("vgId:%d, close sync", pVnode->config.vgId);
syncStop(pVnode->sync); syncStop(pVnode->sync);
......
...@@ -228,6 +228,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode); ...@@ -228,6 +228,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode);
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode);
void syncNodePostClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq);
int32_t syncNodeRestore(SSyncNode* pSyncNode); int32_t syncNodeRestore(SSyncNode* pSyncNode);
void syncHbTimerDataFree(SSyncHbTimerData* pData); void syncHbTimerDataFree(SSyncHbTimerData* pData);
......
...@@ -124,6 +124,14 @@ void syncPreStop(int64_t rid) { ...@@ -124,6 +124,14 @@ void syncPreStop(int64_t rid) {
} }
} }
void syncPostStop(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode != NULL) {
syncNodePostClose(pSyncNode);
syncNodeRelease(pSyncNode);
}
}
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
if (!syncNodeInConfig(pSyncNode, pCfg)) return false; if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1; return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
...@@ -1236,6 +1244,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { ...@@ -1236,6 +1244,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
} }
} }
#if 0
if (pSyncNode->pNewNodeReceiver != NULL) { if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
...@@ -1246,6 +1255,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { ...@@ -1246,6 +1255,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver); snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL; pSyncNode->pNewNodeReceiver = NULL;
} }
#endif
// stop elect timer // stop elect timer
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
...@@ -1257,6 +1267,19 @@ void syncNodePreClose(SSyncNode* pSyncNode) { ...@@ -1257,6 +1267,19 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
syncRespCleanRsp(pSyncNode->pSyncRespMgr); syncRespCleanRsp(pSyncNode->pSyncRespMgr);
} }
void syncNodePostClose(SSyncNode* pSyncNode) {
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
}
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册