提交 94d96109 编写于 作者: H Haojun Liao

[TD-225]merge develop.

...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tref.h"
#include "tsync.h" #include "tsync.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnode.h" #include "dnode.h"
...@@ -28,7 +29,9 @@ ...@@ -28,7 +29,9 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
static SBnMgmt tsBnMgmt;; extern int64_t tsDnodeRid;
extern int64_t tsSdbRid;
static SBnMgmt tsBnMgmt;
static void bnMonitorDnodeModule(); static void bnMonitorDnodeModule();
static void bnLock() { static void bnLock() {
...@@ -529,6 +532,9 @@ void bnCheckStatus() { ...@@ -529,6 +532,9 @@ void bnCheckStatus() {
void * pIter = NULL; void * pIter = NULL;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
void *dnodeSdb = taosAcquireRef(tsSdbRid, tsDnodeRid);
if (dnodeSdb == NULL) return;
while (1) { while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode); pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break; if (pDnode == NULL) break;
...@@ -543,6 +549,8 @@ void bnCheckStatus() { ...@@ -543,6 +549,8 @@ void bnCheckStatus() {
} }
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
taosReleaseRef(tsSdbRid, tsDnodeRid);
} }
void bnCheckModules() { void bnCheckModules() {
......
...@@ -366,7 +366,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand ...@@ -366,7 +366,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
queue->numOfItems--; queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems); uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, pNode->type, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
......
...@@ -28,6 +28,7 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); ...@@ -28,6 +28,7 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
int32_t vnodeReset(SVnodeObj *pVnode); int32_t vnodeReset(SVnodeObj *pVnode);
void vnodeCleanUp(SVnodeObj *pVnode);
void vnodeDestroy(SVnodeObj *pVnode); void vnodeDestroy(SVnodeObj *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -23,8 +23,8 @@ extern "C" { ...@@ -23,8 +23,8 @@ extern "C" {
int32_t vnodeInitMWorker(); int32_t vnodeInitMWorker();
void vnodeCleanupMWorker(); void vnodeCleanupMWorker();
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle); int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode);
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle); int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,71 +25,12 @@ ...@@ -25,71 +25,12 @@
#include "vnodeStatus.h" #include "vnodeStatus.h"
#include "vnodeSync.h" #include "vnodeSync.h"
#include "vnodeVersion.h" #include "vnodeVersion.h"
#include "vnodeMgmt.h"
#include "vnodeWorker.h"
#include "vnodeMain.h"
#include "vnodeCancel.h" #include "vnodeCancel.h"
static SHashObj*tsVnodesHash; static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(int32_t vgId, int8_t role);
static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
#ifndef _SYNC
int64_t syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype) { return 0; }
void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg *cfg) { return 0; }
int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; }
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
int32_t vnodeInitResources() {
int32_t code = syncInit();
if (code != 0) return code;
vnodeInitWriteFp();
vnodeInitReadFp();
vnodeInitCWorker();
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodesHash == NULL) {
vError("failed to init vnode list");
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) {
vError("failed to init vnode commit queue");
return terrno;
}
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupResources() {
vnodeCleanupCWorker();
tsdbDestroyCommitQueue();
if (tsVnodesHash != NULL) {
vDebug("vnode list is cleanup");
taosHashCleanup(tsVnodesHash);
tsVnodesHash = NULL;
}
syncCleanUp();
}
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
int32_t code; int32_t code;
...@@ -171,8 +112,10 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -171,8 +112,10 @@ int32_t vnodeDrop(int32_t vgId) {
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
pVnode->dropped = 1; pVnode->dropped = 1;
// remove from hash, so new messages wont be consumed
vnodeRemoveFromHash(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
vnodeCleanUp(pVnode); vnodeCleanupInMWorker(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -370,6 +313,7 @@ int32_t vnodeOpen(int32_t vgId) { ...@@ -370,6 +313,7 @@ int32_t vnodeOpen(int32_t vgId) {
if (pVnode->sync <= 0) { if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, vError("vgId:%d, failed to open sync, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno)); tstrerror(terrno));
vnodeRemoveFromHash(pVnode);
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
} }
...@@ -383,6 +327,7 @@ int32_t vnodeClose(int32_t vgId) { ...@@ -383,6 +327,7 @@ int32_t vnodeClose(int32_t vgId) {
if (pVnode == NULL) return 0; if (pVnode == NULL) return 0;
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRemoveFromHash(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
...@@ -459,11 +404,7 @@ void vnodeDestroy(SVnodeObj *pVnode) { ...@@ -459,11 +404,7 @@ void vnodeDestroy(SVnodeObj *pVnode) {
tsdbDecCommitRef(vgId); tsdbDecCommitRef(vgId);
} }
void vnodeCleanUp(SVnodeObj *pVnode) {
static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed
vnodeRemoveFromHash(pVnode);
if (!vnodeInInitStatus(pVnode)) { if (!vnodeInInitStatus(pVnode)) {
// it may be in updateing or reset state, then it shall wait // it may be in updateing or reset state, then it shall wait
int32_t i = 0; int32_t i = 0;
......
...@@ -114,7 +114,7 @@ void vnodeRelease(void *vparam) { ...@@ -114,7 +114,7 @@ void vnodeRelease(void *vparam) {
} }
} else { } else {
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode); vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode);
vnodeDestroy(pVnode); vnodeDestroyInMWorker(pVnode);
int32_t count = taosHashGetSize(tsVnodesHash); int32_t count = taosHashGetSize(tsVnodesHash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", pVnode->vgId, count); vDebug("vgId:%d, vnode is destroyed, vnodes:%d", pVnode->vgId, count);
} }
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#define _NON_BLOCKING_RETRIEVE 0
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tqueue.h" #include "tqueue.h"
...@@ -381,8 +381,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -381,8 +381,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true; freeHandle = true;
} else { // result is not ready, return immediately } else { // result is not ready, return immediately
assert(buildRes == true);
// Only effects in the non-blocking model // Only effects in the non-blocking model
if (!tsHalfCoresForQuery) { if (!tsHalfCoresForQuery) {
if (!buildRes) { if (!buildRes) {
......
...@@ -21,10 +21,11 @@ ...@@ -21,10 +21,11 @@
#include "tqueue.h" #include "tqueue.h"
#include "tglobal.h" #include "tglobal.h"
#include "vnodeWorker.h" #include "vnodeWorker.h"
#include "vnodeMain.h"
typedef enum { typedef enum {
VNODE_WORKER_ACTION_CREATE, VNODE_WORKER_ACTION_CLEANUP,
VNODE_WORKER_ACTION_DELETE VNODE_WORKER_ACTION_DESTROUY
} EVMWorkerAction; } EVMWorkerAction;
typedef struct { typedef struct {
...@@ -132,14 +133,11 @@ void vnodeCleanupMWorker() { ...@@ -132,14 +133,11 @@ void vnodeCleanupMWorker() {
vnodeStopMWorker(); vnodeStopMWorker();
} }
static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) { static int32_t vnodeWriteIntoMWorker(SVnodeObj *pVnode, EVMWorkerAction action, void *rpcHandle) {
SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg)); SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg));
if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY; if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
SVnodeObj *pVnode = vnodeAcquire(vgId); pMsg->vgId = pVnode->vgId;
if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID;
pMsg->vgId = vgId;
pMsg->pVnode = pVnode; pMsg->pVnode = pVnode;
pMsg->rpcHandle = rpcHandle; pMsg->rpcHandle = rpcHandle;
pMsg->action = action; pMsg->action = action;
...@@ -150,29 +148,27 @@ static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void * ...@@ -150,29 +148,27 @@ static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *
return code; return code;
} }
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) { int32_t vnodeCleanupInMWorker(SVnodeObj *pVnode) {
vTrace("vgId:%d, will open in vmworker", vgId); vTrace("vgId:%d, will cleanup in vmworker", pVnode->vgId);
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle); return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_CLEANUP, NULL);
} }
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) { int32_t vnodeDestroyInMWorker(SVnodeObj *pVnode) {
vTrace("vgId:%d, will cleanup in vmworker", vgId); vTrace("vgId:%d, will destroy in vmworker", pVnode->vgId);
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle); return vnodeWriteIntoMWorker(pVnode, VNODE_WORKER_ACTION_DESTROUY, NULL);
} }
static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) { static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) {
vTrace("vgId:%d, disposed in vmworker", pMsg->vgId); vTrace("vgId:%d, disposed in vmworker", pMsg->vgId);
vnodeRelease(pMsg->pVnode);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) { static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) {
SRpcMsg rpcRsp = { if (pMsg->rpcHandle != NULL) {
.handle = pMsg->rpcHandle, SRpcMsg rpcRsp = {.handle = pMsg->rpcHandle, .code = pMsg->code};
.code = pMsg->code, rpcSendResponse(&rpcRsp);
}; }
rpcSendResponse(&rpcRsp);
vnodeFreeMWorkerMsg(pMsg); vnodeFreeMWorkerMsg(pMsg);
} }
...@@ -180,11 +176,11 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { ...@@ -180,11 +176,11 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
pMsg->code = 0; pMsg->code = 0;
switch (pMsg->action) { switch (pMsg->action) {
case VNODE_WORKER_ACTION_CREATE: case VNODE_WORKER_ACTION_CLEANUP:
pMsg->code = vnodeOpen(pMsg->vgId); vnodeCleanUp(pMsg->pVnode);
break; break;
case VNODE_WORKER_ACTION_DELETE: case VNODE_WORKER_ACTION_DESTROUY:
pMsg->code = vnodeDrop(pMsg->vgId); vnodeDestroy(pMsg->pVnode);
break; break;
default: default:
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册