提交 d0d56645 编写于 作者: H Hongze Cheng

more

上级 1ab4f2c4
...@@ -71,7 +71,7 @@ typedef struct STbCfg { ...@@ -71,7 +71,7 @@ typedef struct STbCfg {
} STbCfg; } STbCfg;
// SMeta operations // SMeta operations
SMeta *metaOpen(const char *path, const SMetaCfg *pOptions); SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg);
void metaClose(SMeta *pMeta); void metaClose(SMeta *pMeta);
void metaRemove(const char *path); void metaRemove(const char *path);
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
...@@ -79,8 +79,8 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); ...@@ -79,8 +79,8 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta); int metaCommit(SMeta *pMeta);
// Options // Options
void metaOptionsInit(SMetaCfg *pOptions); void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pOptions); void metaOptionsClear(SMetaCfg *pMetaCfg);
// STbCfg // STbCfg
#define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \ #define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \
......
...@@ -264,7 +264,7 @@ typedef struct STQ { ...@@ -264,7 +264,7 @@ typedef struct STQ {
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac); STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac);
void tqDestroy(STQ*); void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, int64_t version);
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_H_ #ifndef _TD_TSDB_H_
#define _TD_TSDB_H_ #define _TD_TSDB_H_
#include "mallocator.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -25,7 +27,7 @@ typedef struct STsdb STsdb; ...@@ -25,7 +27,7 @@ typedef struct STsdb STsdb;
typedef struct STsdbCfg STsdbCfg; typedef struct STsdbCfg STsdbCfg;
// STsdb // STsdb
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg); STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
void tsdbClose(STsdb *); void tsdbClose(STsdb *);
void tsdbRemove(const char *path); void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg); int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg);
......
...@@ -32,6 +32,8 @@ int vnodeBufPoolRecycle(SVnode *pVnode); ...@@ -32,6 +32,8 @@ int vnodeBufPoolRecycle(SVnode *pVnode);
void *vnodeMalloc(SVnode *pVnode, uint64_t size); void *vnodeMalloc(SVnode *pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode *pVnode); bool vnodeBufPoolIsFull(SVnode *pVnode);
SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_MAF_H_
#define _TD_VNODE_MAF_H_
#include "vnode.h"
#ifdef __cplusplus
extern "C" {
#endif
int vnodeOpenMAF(SVnode *pVnode);
void vnodeCloseMAF(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_MAF_H_*/
\ No newline at end of file
...@@ -24,8 +24,8 @@ struct SVBufPool { ...@@ -24,8 +24,8 @@ struct SVBufPool {
TD_DLIST(SVMemAllocator) free; TD_DLIST(SVMemAllocator) free;
TD_DLIST(SVMemAllocator) incycle; TD_DLIST(SVMemAllocator) incycle;
SVMemAllocator *inuse; SVMemAllocator *inuse;
// MAF for submodules // MAF for submodules to use
// SMemAllocatorFactory maf; SMemAllocatorFactory *pMAF;
}; };
int vnodeOpenBufPool(SVnode *pVnode) { int vnodeOpenBufPool(SVnode *pVnode) {
...@@ -125,6 +125,8 @@ bool vnodeBufPoolIsFull(SVnode *pVnode) { ...@@ -125,6 +125,8 @@ bool vnodeBufPoolIsFull(SVnode *pVnode) {
return vmaIsFull(pVnode->pBufPool->inuse); return vmaIsFull(pVnode->pBufPool->inuse);
} }
SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode) { return pVnode->pBufPool->pMAF; }
#if 0 #if 0
typedef enum { typedef enum {
......
...@@ -102,7 +102,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -102,7 +102,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb // Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg)); pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
...@@ -110,7 +110,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -110,7 +110,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// TODO: Open TQ // TODO: Open TQ
sprintf(dir, "%s/tq", pVnode->path); sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, NULL); pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
...@@ -131,7 +131,9 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -131,7 +131,9 @@ static int vnodeOpenImpl(SVnode *pVnode) {
static void vnodeCloseImpl(SVnode *pVnode) { static void vnodeCloseImpl(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
tsdbClose(pVnode->pTsdb);
metaClose(pVnode->pMeta); metaClose(pVnode->pMeta);
tsdbClose(pVnode->pTsdb);
tqClose(pVnode->pTq);
walClose(pVnode->pWal);
} }
} }
\ No newline at end of file
...@@ -17,27 +17,27 @@ ...@@ -17,27 +17,27 @@
#include "metaDef.h" #include "metaDef.h"
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions); static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg);
static void metaFree(SMeta *pMeta); static void metaFree(SMeta *pMeta);
static int metaOpenImpl(SMeta *pMeta); static int metaOpenImpl(SMeta *pMeta);
static void metaCloseImpl(SMeta *pMeta); static void metaCloseImpl(SMeta *pMeta);
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaOptions) { SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
// Set default options // Set default options
if (pMetaOptions == NULL) { if (pMetaCfg == NULL) {
pMetaOptions = &defaultMetaOptions; pMetaCfg = &defaultMetaOptions;
} }
// Validate the options // Validate the options
if (metaValidateOptions(pMetaOptions) < 0) { if (metaValidateOptions(pMetaCfg) < 0) {
// TODO: deal with error // TODO: deal with error
return NULL; return NULL;
} }
// Allocate handle // Allocate handle
pMeta = metaNew(path, pMetaOptions); pMeta = metaNew(path, pMetaCfg);
if (pMeta == NULL) { if (pMeta == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -65,7 +65,7 @@ void metaClose(SMeta *pMeta) { ...@@ -65,7 +65,7 @@ void metaClose(SMeta *pMeta) {
void metaRemove(const char *path) { taosRemoveDir(path); } void metaRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) { static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg) {
SMeta *pMeta; SMeta *pMeta;
size_t psize = strlen(path); size_t psize = strlen(path);
...@@ -80,7 +80,7 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) { ...@@ -80,7 +80,7 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) {
return NULL; return NULL;
} }
metaOptionsCopy(&(pMeta->options), pMetaOptions); metaOptionsCopy(&(pMeta->options), pMetaCfg);
return pMeta; return pMeta;
}; };
......
...@@ -66,6 +66,10 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl ...@@ -66,6 +66,10 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl
return pTq; return pTq;
} }
void tqClose(STQ*pTq) {
// TODO
}
static int tqProtoCheck(TmqMsgHead *pMsg) { static int tqProtoCheck(TmqMsgHead *pMsg) {
return pMsg->protoVer == 0; return pMsg->protoVer == 0;
} }
......
...@@ -15,27 +15,27 @@ ...@@ -15,27 +15,27 @@
#include "tsdbDef.h" #include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions); static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg);
static void tsdbFree(STsdb *pTsdb); static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbOptions) { STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
// Set default TSDB Options // Set default TSDB Options
if (pTsdbOptions == NULL) { if (pTsdbCfg == NULL) {
pTsdbOptions = &defautlTsdbOptions; pTsdbCfg = &defautlTsdbOptions;
} }
// Validate the options // Validate the options
if (tsdbValidateOptions(pTsdbOptions) < 0) { if (tsdbValidateOptions(pTsdbCfg) < 0) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
} }
// Create the handle // Create the handle
pTsdb = tsdbNew(path, pTsdbOptions); pTsdb = tsdbNew(path, pTsdbCfg);
if (pTsdb == NULL) { if (pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -62,7 +62,7 @@ void tsdbClose(STsdb *pTsdb) { ...@@ -62,7 +62,7 @@ void tsdbClose(STsdb *pTsdb) {
void tsdbRemove(const char *path) { taosRemoveDir(path); } void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) { static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
pTsdb = (STsdb *)calloc(1, sizeof(STsdb)); pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
...@@ -72,7 +72,7 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) { ...@@ -72,7 +72,7 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) {
} }
pTsdb->path = strdup(path); pTsdb->path = strdup(path);
tsdbOptionsCopy(&(pTsdb->options), pTsdbOptions); tsdbOptionsCopy(&(pTsdb->options), pTsdbCfg);
return pTsdb; return pTsdb;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册