提交 249de584 编写于 作者: H Hongze Cheng

more refact

上级 593adbd5
......@@ -13,8 +13,8 @@ target_sources(
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeWrite.c"
# "src/vnd/vnodeModule.c"
"src/vnd/vnodeMgr.c"
"src/vnd/vnodeModule.c"
# "src/vnd/vnodeMgr.c"
# meta
# "src/meta/metaBDBImpl.c"
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
// vndDebug ====================
// vnodeDebug ====================
// clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
......@@ -30,6 +30,61 @@ extern "C" {
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
// vnodeModule ====================
int vnodeScheduleTask(int (*execute)(void*), void* arg);
// vnodeQuery ====================
int vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
#if 1
// SVBufPool
int vnodeOpenBufPool(SVnode* pVnode);
void vnodeCloseBufPool(SVnode* pVnode);
int vnodeBufPoolSwitch(SVnode* pVnode);
int vnodeBufPoolRecycle(SVnode* pVnode);
void* vnodeMalloc(SVnode* pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode* pVnode);
SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode);
// SVMemAllocator
typedef struct SVArenaNode {
TD_SLIST_NODE(SVArenaNode);
uint64_t size; // current node size
void* ptr;
char data[];
} SVArenaNode;
typedef struct SVMemAllocator {
T_REF_DECLARE()
TD_DLIST_NODE(SVMemAllocator);
uint64_t capacity;
uint64_t ssize;
uint64_t lsize;
SVArenaNode* pNode;
TD_SLIST(SVArenaNode) nlist;
} SVMemAllocator;
SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize);
void vmaDestroy(SVMemAllocator* pVMA);
void vmaReset(SVMemAllocator* pVMA);
void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
// vnodeCfg.h
extern const SVnodeCfg defaultVnodeOptions;
int vnodeValidateOptions(const SVnodeCfg*);
void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode);
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -43,29 +43,13 @@
extern "C" {
#endif
typedef struct STQ STQ;
typedef struct SMeta SMeta;
typedef struct STsdb STsdb;
typedef struct STQ STQ;
typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool;
typedef struct SQWorkerMgmt SQHandle;
typedef struct SVnodeTask {
TD_DLIST_NODE(SVnodeTask);
void* arg;
int (*execute)(void*);
} SVnodeTask;
typedef struct SVnodeMgr {
td_mode_flag_t vnodeInitFlag;
// For commit
bool stop;
uint16_t nthreads;
TdThread* threads;
TdThreadMutex mutex;
TdThreadCond hasTask;
TD_DLIST(SVnodeTask) queue;
} SVnodeMgr;
typedef struct {
int8_t streamType; // sma or other
int8_t dstType;
......@@ -81,8 +65,6 @@ typedef struct {
SHashObj* pHash; // streamId -> SStreamSinkInfo
} SSink;
extern SVnodeMgr vnodeMgr;
// SVState
struct SVState {
int64_t processed;
......@@ -107,57 +89,6 @@ struct SVnode {
STfs* pTfs;
};
int vnodeScheduleTask(SVnodeTask* task);
int vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
// vnodeCfg.h
extern const SVnodeCfg defaultVnodeOptions;
int vnodeValidateOptions(const SVnodeCfg*);
void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode);
// SVBufPool
int vnodeOpenBufPool(SVnode* pVnode);
void vnodeCloseBufPool(SVnode* pVnode);
int vnodeBufPoolSwitch(SVnode* pVnode);
int vnodeBufPoolRecycle(SVnode* pVnode);
void* vnodeMalloc(SVnode* pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode* pVnode);
SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode);
// SVMemAllocator
typedef struct SVArenaNode {
TD_SLIST_NODE(SVArenaNode);
uint64_t size; // current node size
void* ptr;
char data[];
} SVArenaNode;
typedef struct SVMemAllocator {
T_REF_DECLARE()
TD_DLIST_NODE(SVMemAllocator);
uint64_t capacity;
uint64_t ssize;
uint64_t lsize;
SVArenaNode* pNode;
TD_SLIST(SVArenaNode) nlist;
} SVMemAllocator;
SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize);
void vmaDestroy(SVMemAllocator* pVMA);
void vmaReset(SVMemAllocator* pVMA);
void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
......@@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
vnodeWaitCommit(pVnode);
vnodeBufPoolSwitch(pVnode);
SVnodeTask *pTask = (SVnodeTask *)taosMemoryMalloc(sizeof(*pTask));
pTask->execute = vnodeCommit; // TODO
pTask->arg = pVnode; // TODO
tsdbPrepareCommit(pVnode->pTsdb);
// metaPrepareCommit(pVnode->pMeta);
// walPreapareCommit(pVnode->pWal);
vnodeScheduleTask(pTask);
vnodeScheduleTask(vnodeCommit, pVnode);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册