未验证 提交 74d7e80f 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #6693 from taosdata/fix/TD-4593

[TD-4593]<fix>: fix vnode cannnot close while syncing
...@@ -169,7 +169,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -169,7 +169,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); SAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pAlter->cfg.vgId); void *pVnode = vnodeAcquireNotClose(pAlter->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pAlter); int32_t code = vnodeAlter(pVnode, pAlter);
......
...@@ -63,7 +63,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { ...@@ -63,7 +63,7 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
assert(pHead->contLen > 0); assert(pHead->contLen > 0);
void *pVnode = vnodeAcquire(pHead->vgId); void *pVnode = vnodeAcquireNotClose(pHead->vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg); code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++; if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
......
...@@ -85,7 +85,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { ...@@ -85,7 +85,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
pMsg->vgId = htonl(pMsg->vgId); pMsg->vgId = htonl(pMsg->vgId);
pMsg->contLen = htonl(pMsg->contLen); pMsg->contLen = htonl(pMsg->contLen);
void *pVnode = vnodeAcquire(pMsg->vgId); void *pVnode = vnodeAcquireNotClose(pMsg->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
code = TSDB_CODE_VND_INVALID_VGROUP_ID; code = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else { } else {
......
...@@ -68,6 +68,7 @@ int32_t vnodeInitMgmt(); ...@@ -68,6 +68,7 @@ int32_t vnodeInitMgmt();
void vnodeCleanupMgmt(); void vnodeCleanupMgmt();
void* vnodeAcquire(int32_t vgId); void* vnodeAcquire(int32_t vgId);
void vnodeRelease(void *pVnode); void vnodeRelease(void *pVnode);
void* vnodeAcquireNotClose(int32_t vgId);
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *pStatus); void vnodeBuildStatusMsg(void *pStatus);
......
...@@ -41,6 +41,8 @@ typedef struct { ...@@ -41,6 +41,8 @@ typedef struct {
int32_t queuedWMsg; int32_t queuedWMsg;
int32_t queuedRMsg; int32_t queuedRMsg;
int32_t flowctrlLevel; int32_t flowctrlLevel;
int8_t preClose; // drop and close switch
int8_t reserved[3];
int64_t sequence; // for topic int64_t sequence; // for topic
int8_t status; int8_t status;
int8_t role; int8_t role;
......
...@@ -93,7 +93,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { ...@@ -93,7 +93,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
} }
int32_t vnodeSync(int32_t vgId) { int32_t vnodeSync(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquireNotClose(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vDebug("vgId:%d, failed to sync, vnode not find", vgId); vDebug("vgId:%d, failed to sync, vnode not find", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
...@@ -115,7 +115,7 @@ int32_t vnodeSync(int32_t vgId) { ...@@ -115,7 +115,7 @@ int32_t vnodeSync(int32_t vgId) {
} }
int32_t vnodeDrop(int32_t vgId) { int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquireNotClose(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vDebug("vgId:%d, failed to drop, vnode not find", vgId); vDebug("vgId:%d, failed to drop, vnode not find", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
...@@ -390,15 +390,16 @@ int32_t vnodeOpen(int32_t vgId) { ...@@ -390,15 +390,16 @@ int32_t vnodeOpen(int32_t vgId) {
} }
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquireNotClose(vgId);
if (pVnode == NULL) return 0; if (pVnode == NULL) return 0;
if (pVnode->dropped) { if (pVnode->dropped) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
return 0; return 0;
} }
pVnode->preClose = 1;
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRemoveFromHash(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
......
...@@ -125,6 +125,18 @@ void vnodeRelease(void *vparam) { ...@@ -125,6 +125,18 @@ void vnodeRelease(void *vparam) {
} }
} }
void *vnodeAcquireNotClose(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode != NULL && pVnode->preClose == 1) {
vnodeRelease(pVnode);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist, pre closing", vgId);
return NULL;
}
return pVnode;
}
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
int64_t totalStorage = 0; int64_t totalStorage = 0;
int64_t compStorage = 0; int64_t compStorage = 0;
...@@ -187,7 +199,7 @@ void vnodeBuildStatusMsg(void *param) { ...@@ -187,7 +199,7 @@ void vnodeBuildStatusMsg(void *param) {
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId); pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId); SVnodeObj *pVnode = vnodeAcquireNotClose(pAccess[i].vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState; pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
......
...@@ -396,10 +396,13 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { ...@@ -396,10 +396,13 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
} }
void vnodeWaitWriteCompleted(SVnodeObj *pVnode) { void vnodeWaitWriteCompleted(SVnodeObj *pVnode) {
int32_t extraSleep = 0;
while (pVnode->queuedWMsg > 0) { while (pVnode->queuedWMsg > 0) {
vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg); vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg);
taosMsleep(10); taosMsleep(10);
extraSleep = 1;
} }
taosMsleep(900); if (extraSleep)
taosMsleep(900);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册