提交 8612f882 编写于 作者: H Haojun Liao

Merge branch '3.0' into feature/3.0_liaohj

...@@ -52,6 +52,11 @@ typedef struct SVState SVState; ...@@ -52,6 +52,11 @@ typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorkerMgmt SQHandle; typedef struct SQWorkerMgmt SQHandle;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
int8_t dstType; int8_t dstType;
......
...@@ -15,9 +15,6 @@ ...@@ -15,9 +15,6 @@
#include "vnodeInt.h" #include "vnodeInt.h"
static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode);
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
...@@ -55,6 +52,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -55,6 +52,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
char tdir[TSDB_FILENAME_LEN * 2];
int ret; int ret;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path); snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
...@@ -83,83 +81,85 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -83,83 +81,85 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
tsem_init(&(pVnode->canCommit), 0, 1); tsem_init(&(pVnode->canCommit), 0, 1);
// open the vnode // open buffer pool
if (vnodeOpenImpl(pVnode) < 0) {
// TODO: handle error
return NULL;
}
return pVnode;
}
void vnodeClose(SVnode *pVnode) {
if (pVnode) {
vnodeCloseImpl(pVnode);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFreeClear(pVnode->path);
taosMemoryFree(pVnode);
}
}
/* ------------------------ STATIC METHODS ------------------------ */
static int vnodeOpenImpl(SVnode *pVnode) {
char dir[TSDB_FILENAME_LEN];
if (vnodeOpenBufPool(pVnode) < 0) { if (vnodeOpenBufPool(pVnode) < 0) {
// TODO: handle error vError("vgId: %d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; goto _err;
} }
// Open meta // open meta
sprintf(dir, "%s/meta", pVnode->path); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_META_DIR);
pVnode->pMeta = metaOpen(dir, vBufPoolGetMAF(pVnode)); pVnode->pMeta = metaOpen(tdir, vBufPoolGetMAF(pVnode));
if (pVnode->pMeta == NULL) { if (pVnode->pMeta == NULL) {
// TODO: handle error vError("vgId: %d failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; goto _err;
} }
// Open tsdb // open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TSDB_DIR);
pVnode->pTsdb = pVnode->pTsdb =
tsdbOpen(dir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); tsdbOpen(tdir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; goto _err;
} }
// Open WAL // open wal
sprintf(dir, "%s/wal", pVnode->path); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) { if (pVnode->pWal == NULL) {
// TODO: handle error vError("vgId: %d failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; goto _err;
} }
// Open TQ // open tq
sprintf(dir, "%s/tq", pVnode->path); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; goto _err;
} }
// Open Query // open query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
return -1; vError("vgId: %d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
} }
// TODO #if 0
return 0; if (vnodeBegin() < 0) {
goto _err;
}
#endif
return pVnode;
_err:
if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFreeClear(pVnode->path);
taosMemoryFree(pVnode);
return NULL;
} }
static void vnodeCloseImpl(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
vnodeSyncCommit(pVnode);
if (pVnode) { if (pVnode) {
vnodeCloseBufPool(pVnode); // commit (TODO: use option to control)
metaClose(pVnode->pMeta); vnodeSyncCommit(pVnode);
tsdbClose(pVnode->pTsdb); // close vnode
tqClose(pVnode->pTq);
walClose(pVnode->pWal);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
walClose(pVnode->pWal);
tqClose(pVnode->pTq);
tsdbClose(pVnode->pTsdb);
metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode);
// destroy handle
tsem_destroy(&(pVnode->canCommit));
taosMemoryFreeClear(pVnode->path);
taosMemoryFree(pVnode);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册