diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index ba628a808d10ffe17feaf8a4ed606e331adf0d52..ad9153cf8319b826141bd684b31516b3ae9ffa98 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -13,8 +13,8 @@ target_sources( "src/vnd/vnodeQuery.c" "src/vnd/vnodeStateMgr.c" "src/vnd/vnodeWrite.c" - # "src/vnd/vnodeModule.c" - "src/vnd/vnodeMgr.c" + "src/vnd/vnodeModule.c" + # "src/vnd/vnodeMgr.c" # meta # "src/meta/metaBDBImpl.c" diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index cf6296203063472a2acef1c105c784bcd6edd0c3..6957cfdb73a02d1a82a6313bc15433829f283001 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -20,7 +20,7 @@ extern "C" { #endif -// vndDebug ==================== +// vnodeDebug ==================== // clang-format off #define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) @@ -30,6 +30,61 @@ extern "C" { #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +// vnodeModule ==================== +int vnodeScheduleTask(int (*execute)(void*), void* arg); + +// vnodeQuery ==================== +int vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryClose(SVnode* pVnode); + +#if 1 +// SVBufPool +int vnodeOpenBufPool(SVnode* pVnode); +void vnodeCloseBufPool(SVnode* pVnode); +int vnodeBufPoolSwitch(SVnode* pVnode); +int vnodeBufPoolRecycle(SVnode* pVnode); +void* vnodeMalloc(SVnode* pVnode, uint64_t size); +bool vnodeBufPoolIsFull(SVnode* pVnode); + +SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode); + +// SVMemAllocator +typedef struct SVArenaNode { + TD_SLIST_NODE(SVArenaNode); + uint64_t size; // current node size + void* ptr; + char data[]; +} SVArenaNode; + +typedef struct SVMemAllocator { + T_REF_DECLARE() + TD_DLIST_NODE(SVMemAllocator); + uint64_t capacity; + uint64_t ssize; + uint64_t lsize; + SVArenaNode* pNode; + TD_SLIST(SVArenaNode) nlist; +} SVMemAllocator; + +SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize); +void vmaDestroy(SVMemAllocator* pVMA); +void vmaReset(SVMemAllocator* pVMA); +void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); +void vmaFree(SVMemAllocator* pVMA, void* ptr); +bool vmaIsFull(SVMemAllocator* pVMA); + +// vnodeCfg.h +extern const SVnodeCfg defaultVnodeOptions; + +int vnodeValidateOptions(const SVnodeCfg*); +void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc); + +// For commit +#define vnodeShouldCommit vnodeBufPoolIsFull +int vnodeSyncCommit(SVnode* pVnode); +int vnodeAsyncCommit(SVnode* pVnode); +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6fbf4fb1dfd167159c3d55a6a60b2ba6e05cb1c0..c956184fc9fd64081c62e0bfdee92460b687a704 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -43,29 +43,13 @@ extern "C" { #endif -typedef struct STQ STQ; - +typedef struct SMeta SMeta; +typedef struct STsdb STsdb; +typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; typedef struct SQWorkerMgmt SQHandle; -typedef struct SVnodeTask { - TD_DLIST_NODE(SVnodeTask); - void* arg; - int (*execute)(void*); -} SVnodeTask; - -typedef struct SVnodeMgr { - td_mode_flag_t vnodeInitFlag; - // For commit - bool stop; - uint16_t nthreads; - TdThread* threads; - TdThreadMutex mutex; - TdThreadCond hasTask; - TD_DLIST(SVnodeTask) queue; -} SVnodeMgr; - typedef struct { int8_t streamType; // sma or other int8_t dstType; @@ -81,8 +65,6 @@ typedef struct { SHashObj* pHash; // streamId -> SStreamSinkInfo } SSink; -extern SVnodeMgr vnodeMgr; - // SVState struct SVState { int64_t processed; @@ -107,57 +89,6 @@ struct SVnode { STfs* pTfs; }; -int vnodeScheduleTask(SVnodeTask* task); -int vnodeQueryOpen(SVnode* pVnode); -void vnodeQueryClose(SVnode* pVnode); - -// vnodeCfg.h -extern const SVnodeCfg defaultVnodeOptions; - -int vnodeValidateOptions(const SVnodeCfg*); -void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc); - -// For commit -#define vnodeShouldCommit vnodeBufPoolIsFull -int vnodeSyncCommit(SVnode* pVnode); -int vnodeAsyncCommit(SVnode* pVnode); - -// SVBufPool - -int vnodeOpenBufPool(SVnode* pVnode); -void vnodeCloseBufPool(SVnode* pVnode); -int vnodeBufPoolSwitch(SVnode* pVnode); -int vnodeBufPoolRecycle(SVnode* pVnode); -void* vnodeMalloc(SVnode* pVnode, uint64_t size); -bool vnodeBufPoolIsFull(SVnode* pVnode); - -SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode); - -// SVMemAllocator -typedef struct SVArenaNode { - TD_SLIST_NODE(SVArenaNode); - uint64_t size; // current node size - void* ptr; - char data[]; -} SVArenaNode; - -typedef struct SVMemAllocator { - T_REF_DECLARE() - TD_DLIST_NODE(SVMemAllocator); - uint64_t capacity; - uint64_t ssize; - uint64_t lsize; - SVArenaNode* pNode; - TD_SLIST(SVArenaNode) nlist; -} SVMemAllocator; - -SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize); -void vmaDestroy(SVMemAllocator* pVMA); -void vmaReset(SVMemAllocator* pVMA); -void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); -void vmaFree(SVMemAllocator* pVMA, void* ptr); -bool vmaIsFull(SVMemAllocator* pVMA); - // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 1235637e198c62d73e2de99b7773406afc9d23af..b4c3725a5e9f402c911f828d21aeb92280006d96 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) { vnodeWaitCommit(pVnode); vnodeBufPoolSwitch(pVnode); - SVnodeTask *pTask = (SVnodeTask *)taosMemoryMalloc(sizeof(*pTask)); - - pTask->execute = vnodeCommit; // TODO - pTask->arg = pVnode; // TODO - tsdbPrepareCommit(pVnode->pTsdb); - // metaPrepareCommit(pVnode->pMeta); - // walPreapareCommit(pVnode->pWal); - vnodeScheduleTask(pTask); + vnodeScheduleTask(vnodeCommit, pVnode); + return 0; }