提交 b39ceed0 编写于 作者: S Shengliang Guan

TD-2324

上级 2db5f1e8
......@@ -28,6 +28,7 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeReset(SVnodeObj *pVnode);
void vnodeCleanUp(SVnodeObj *pVnode);
void vnodeDestroy(SVnodeObj *pVnode);
#ifdef __cplusplus
......
......@@ -23,8 +23,8 @@ extern "C" {
int32_t vnodeInitMWorker();
void vnodeCleanupMWorker();
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle);
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle);
int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode);
int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode);
#ifdef __cplusplus
}
......
......@@ -26,8 +26,9 @@
#include "vnodeSync.h"
#include "vnodeVersion.h"
#include "vnodeMgmt.h"
#include "vnodeWorker.h"
#include "vnodeMain.h"
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
......@@ -110,6 +111,8 @@ int32_t vnodeDrop(int32_t vgId) {
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
pVnode->dropped = 1;
// remove from hash, so new messages wont be consumed
vnodeRemoveFromHash(pVnode);
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
......@@ -309,6 +312,7 @@ int32_t vnodeOpen(int32_t vgId) {
if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeRemoveFromHash(pVnode);
vnodeCleanUp(pVnode);
return terrno;
}
......@@ -322,6 +326,7 @@ int32_t vnodeClose(int32_t vgId) {
if (pVnode == NULL) return 0;
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRemoveFromHash(pVnode);
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
......@@ -398,11 +403,7 @@ void vnodeDestroy(SVnodeObj *pVnode) {
tsdbDecCommitRef(vgId);
}
static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed
vnodeRemoveFromHash(pVnode);
void vnodeCleanUp(SVnodeObj *pVnode) {
if (!vnodeInInitStatus(pVnode)) {
// it may be in updateing or reset state, then it shall wait
int32_t i = 0;
......
......@@ -21,10 +21,11 @@
#include "tqueue.h"
#include "tglobal.h"
#include "vnodeWorker.h"
#include "vnodeMain.h"
typedef enum {
VNODE_WORKER_ACTION_CREATE,
VNODE_WORKER_ACTION_DELETE
VNODE_WORKER_ACTION_CLEANUP,
VNODE_WORKER_ACTION_DESTROUY
} EVMWorkerAction;
typedef struct {
......@@ -132,14 +133,11 @@ void vnodeCleanupMWorker() {
vnodeStopMWorker();
}
static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) {
static int32_t vnodeWriteIntoMWorker(SVnodeObj *pVnode, EVMWorkerAction action, void *rpcHandle) {
SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg));
if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID;
pMsg->vgId = vgId;
pMsg->vgId = pVnode->vgId;
pMsg->pVnode = pVnode;
pMsg->rpcHandle = rpcHandle;
pMsg->action = action;
......@@ -150,29 +148,27 @@ static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *
return code;
}
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) {
vTrace("vgId:%d, will open in vmworker", vgId);
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle);
int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) {
vTrace("vgId:%d, will cleanup in vmworker", pVnode->vgId);
return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_CLEANUP, NULL);
}
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) {
vTrace("vgId:%d, will cleanup in vmworker", vgId);
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle);
int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) {
vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId);
return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROUY, NULL);
}
static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) {
vTrace("vgId:%d, disposed in vmworker", pMsg->vgId);
vnodeRelease(pMsg->pVnode);
taosFreeQitem(pMsg);
}
static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) {
SRpcMsg rpcRsp = {
.handle = pMsg->rpcHandle,
.code = pMsg->code,
};
if (pMsg->rpcHandle != NULL) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcHandle, .code = pMsg->code};
rpcSendResponse(&rpcRsp);
}
rpcSendResponse(&rpcRsp);
vnodeFreeMWorkerMsg(pMsg);
}
......@@ -180,11 +176,11 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
pMsg->code = 0;
switch (pMsg->action) {
case VNODE_WORKER_ACTION_CREATE:
pMsg->code = vnodeOpen(pMsg->vgId);
case VNODE_WORKER_ACTION_CLEANUP:
vnodeCleanUp(pMsg->pVnode);
break;
case VNODE_WORKER_ACTION_DELETE:
pMsg->code = vnodeDrop(pMsg->vgId);
case VNODE_WORKER_ACTION_DESTROUY:
vnodeDestroy(pMsg->pVnode);
break;
default:
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册