diff --git a/include/util/tmacro.h b/include/util/tmacro.h
index 74056cfe071366c8f93d2de93cad7a28169c6d01..5cca8a10629c8d818fe52e331f3a84a11f7fd999 100644
--- a/include/util/tmacro.h
+++ b/include/util/tmacro.h
@@ -29,13 +29,11 @@ extern "C" {
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
-#define TD_DEF_MOD_INIT_FLAG(MOD) static int8_t MOD##InitFlag = TD_MOD_UNINITIALIZED
-#define TD_DEF_MOD_CLEAR_FLAG(MOD) static int8_t MOD##ClearFlag = TD_MOD_UNCLEARD
+typedef int8_t td_mode_flag_t;
-#define TD_CHECK_AND_SET_MODE_INIT(MOD) \
- atomic_val_compare_exchange_8(&(MOD##InitFlag), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
+#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
-#define TD_CHECK_AND_SET_MOD_CLEAR(MOD) atomic_val_compare_exchange_8(&(MOD##ClearFlag), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
+#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/impl/inc/vnodeCommit.h b/source/dnode/vnode/impl/inc/vnodeCommit.h
index 8f0af27513e46f75f3fbe53f81ecb78e4fa6a12a..a60e8feac21fcc6361405fb67247e77d8739d7dd 100644
--- a/source/dnode/vnode/impl/inc/vnodeCommit.h
+++ b/source/dnode/vnode/impl/inc/vnodeCommit.h
@@ -22,9 +22,6 @@
extern "C" {
#endif
-int vnodeInitCommit(uint16_t nthreads);
-void vnodeClearCommit();
-
#define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeAsyncCommit(SVnode *pVnode);
diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h
index e3a3fac6b90839da79c39450669e95ce45e1b640..e6a88c7629635ace1449d6269a55165bd52f3713 100644
--- a/source/dnode/vnode/impl/inc/vnodeDef.h
+++ b/source/dnode/vnode/impl/inc/vnodeDef.h
@@ -21,9 +21,11 @@
#include "tcoding.h"
#include "tdlist.h"
#include "tlockfree.h"
+#include "tmacro.h"
#include "wal.h"
#include "vnode.h"
+
#include "vnodeBufferPool.h"
#include "vnodeCfg.h"
#include "vnodeCommit.h"
@@ -37,6 +39,27 @@
extern "C" {
#endif
+typedef struct SVnodeTask {
+ TD_DLIST_NODE(SVnodeTask);
+ void* arg;
+ int (*execute)(void*);
+} SVnodeTask;
+
+typedef struct SVnodeMgr {
+ td_mode_flag_t vnodeInitFlag;
+ td_mode_flag_t vnodeClearFlag;
+ // For commit
+ bool stop;
+ uint16_t nthreads;
+ pthread_t* threads;
+ pthread_mutex_t mutex;
+ pthread_cond_t hasTask;
+ TD_DLIST(SVnodeTask) queue;
+ // For vnode Mgmt
+} SVnodeMgr;
+
+extern SVnodeMgr vnodeMgr;
+
struct SVnode {
char* path;
SVnodeCfg config;
@@ -50,6 +73,8 @@ struct SVnode {
SVnodeFS* pFs;
};
+int vnodeScheduleTask(SVnodeTask* task);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/dnode/vnode/impl/src/vnodeCommit.c b/source/dnode/vnode/impl/src/vnodeCommit.c
index 944fe80b3121bca4a8f7dcac4c6a04df5e0146e9..cac7999f59c8fcb70308c64a79f0edcc5fa7a9ac 100644
--- a/source/dnode/vnode/impl/src/vnodeCommit.c
+++ b/source/dnode/vnode/impl/src/vnodeCommit.c
@@ -18,15 +18,6 @@
static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
-int vnodeInitCommit(uint16_t nthreads) {
- // TODO
- return 0;
-}
-
-void vnodeClearCommit() {
- // TODO
-}
-
int vnodeAsyncCommit(SVnode *pVnode) {
#if 0
if (vnodeStartCommit(pVnode) < 0) {
diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c
index 63fc0d52f016d437749b4af46be260027b2b7bb3..59e3bae5d7581986caa4b71dbb11b9178b9eede5 100644
--- a/source/dnode/vnode/impl/src/vnodeMain.c
+++ b/source/dnode/vnode/impl/src/vnodeMain.c
@@ -13,7 +13,6 @@
* along with this program. If not, see .
*/
-#include "tmacro.h"
#include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
@@ -21,35 +20,6 @@ static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode);
-TD_DEF_MOD_INIT_FLAG(vnode);
-TD_DEF_MOD_CLEAR_FLAG(vnode);
-
-int vnodeInit(uint16_t nthreads) {
- if (TD_CHECK_AND_SET_MODE_INIT(vnode) == TD_MOD_INITIALIZED) {
- return 0;
- }
-
- if (walInit() < 0) {
- return -1;
- }
-
- if (vnodeInitCommit(nthreads) < 0) {
- return -1;
- }
-
- return 0;
-}
-
-void vnodeClear() {
- if (TD_CHECK_AND_SET_MOD_CLEAR(vnode) == TD_MOD_CLEARD) {
- return;
- }
-
- walCleanUp();
-
- vnodeClearCommit();
-}
-
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL;
diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c
new file mode 100644
index 0000000000000000000000000000000000000000..b5bd913c52fc35895b3806f19d20a71c3591df71
--- /dev/null
+++ b/source/dnode/vnode/impl/src/vnodeMgr.c
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "vnodeDef.h"
+
+SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED, .vnodeClearFlag = TD_MOD_UNCLEARD, .stop = false};
+
+static void* loop(void* arg);
+
+int vnodeInit(uint16_t nthreads) {
+ if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
+ return 0;
+ }
+
+ // Start commit handers
+ if (nthreads > 0) {
+ vnodeMgr.nthreads = nthreads;
+ vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
+ if (vnodeMgr.threads == NULL) {
+ return -1;
+ }
+
+ pthread_mutex_init(&(vnodeMgr.mutex), NULL);
+ pthread_cond_init(&(vnodeMgr.hasTask), NULL);
+ tDListInit(&(vnodeMgr.queue));
+
+ for (uint16_t i = 0; i < nthreads; i++) {
+ pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
+ }
+ } else {
+ // TODO: if no commit thread is set, then another mechanism should be
+ // given. Otherwise, it is a false.
+ ASSERT(0);
+ }
+
+ if (walInit() < 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+void vnodeClear() {
+ if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeClearFlag)) == TD_MOD_CLEARD) {
+ return;
+ }
+
+ walCleanUp();
+
+ // Stop commit handler
+ pthread_mutex_lock(&(vnodeMgr.mutex));
+ vnodeMgr.stop = true;
+ pthread_cond_broadcast(&(vnodeMgr.hasTask));
+ pthread_mutex_unlock(&(vnodeMgr.mutex));
+
+ for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) {
+ pthread_join(vnodeMgr.threads[i], NULL);
+ }
+
+ tfree(vnodeMgr.threads);
+ pthread_cond_destroy(&(vnodeMgr.hasTask));
+ pthread_mutex_destroy(&(vnodeMgr.mutex));
+}
+
+static void* loop(void* arg) {
+ SVnodeTask* pTask;
+ for (;;) {
+ pthread_mutex_lock(&(vnodeMgr.mutex));
+ for (;;) {
+ pTask = TD_DLIST_HEAD(&(vnodeMgr.queue));
+ if (pTask == NULL) {
+ if (vnodeMgr.stop) {
+ pthread_mutex_unlock(&(vnodeMgr.mutex));
+ return NULL;
+ } else {
+ pthread_cond_wait(&(vnodeMgr.hasTask), &(vnodeMgr.mutex));
+ }
+ } else {
+ tDListPop(&(vnodeMgr.queue), pTask);
+ break;
+ }
+ }
+
+ pthread_mutex_unlock(&(vnodeMgr.mutex));
+
+ (*(pTask->execute))(pTask->arg);
+ }
+
+ return NULL;
+}
\ No newline at end of file