提交 1d54f14c 编写于 作者: H Hongze Cheng

implement commit threads and queue

上级 14ae798f
...@@ -29,13 +29,11 @@ extern "C" { ...@@ -29,13 +29,11 @@ extern "C" {
#define TD_MOD_UNCLEARD 0 #define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1 #define TD_MOD_CLEARD 1
#define TD_DEF_MOD_INIT_FLAG(MOD) static int8_t MOD##InitFlag = TD_MOD_UNINITIALIZED typedef int8_t td_mode_flag_t;
#define TD_DEF_MOD_CLEAR_FLAG(MOD) static int8_t MOD##ClearFlag = TD_MOD_UNCLEARD
#define TD_CHECK_AND_SET_MODE_INIT(MOD) \ #define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
atomic_val_compare_exchange_8(&(MOD##InitFlag), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(MOD) atomic_val_compare_exchange_8(&(MOD##ClearFlag), TD_MOD_UNCLEARD, TD_MOD_CLEARD) #define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,9 +22,6 @@ ...@@ -22,9 +22,6 @@
extern "C" { extern "C" {
#endif #endif
int vnodeInitCommit(uint16_t nthreads);
void vnodeClearCommit();
#define vnodeShouldCommit vnodeBufPoolIsFull #define vnodeShouldCommit vnodeBufPoolIsFull
int vnodeAsyncCommit(SVnode *pVnode); int vnodeAsyncCommit(SVnode *pVnode);
......
...@@ -21,9 +21,11 @@ ...@@ -21,9 +21,11 @@
#include "tcoding.h" #include "tcoding.h"
#include "tdlist.h" #include "tdlist.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tmacro.h"
#include "wal.h" #include "wal.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeBufferPool.h" #include "vnodeBufferPool.h"
#include "vnodeCfg.h" #include "vnodeCfg.h"
#include "vnodeCommit.h" #include "vnodeCommit.h"
...@@ -37,6 +39,27 @@ ...@@ -37,6 +39,27 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SVnodeTask {
TD_DLIST_NODE(SVnodeTask);
void* arg;
int (*execute)(void*);
} SVnodeTask;
typedef struct SVnodeMgr {
td_mode_flag_t vnodeInitFlag;
td_mode_flag_t vnodeClearFlag;
// For commit
bool stop;
uint16_t nthreads;
pthread_t* threads;
pthread_mutex_t mutex;
pthread_cond_t hasTask;
TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt
} SVnodeMgr;
extern SVnodeMgr vnodeMgr;
struct SVnode { struct SVnode {
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
...@@ -50,6 +73,8 @@ struct SVnode { ...@@ -50,6 +73,8 @@ struct SVnode {
SVnodeFS* pFs; SVnodeFS* pFs;
}; };
int vnodeScheduleTask(SVnodeTask* task);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -18,15 +18,6 @@ ...@@ -18,15 +18,6 @@
static int vnodeStartCommit(SVnode *pVnode); static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode); static int vnodeEndCommit(SVnode *pVnode);
int vnodeInitCommit(uint16_t nthreads) {
// TODO
return 0;
}
void vnodeClearCommit() {
// TODO
}
int vnodeAsyncCommit(SVnode *pVnode) { int vnodeAsyncCommit(SVnode *pVnode) {
#if 0 #if 0
if (vnodeStartCommit(pVnode) < 0) { if (vnodeStartCommit(pVnode) < 0) {
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tmacro.h"
#include "vnodeDef.h" #include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
...@@ -21,35 +20,6 @@ static void vnodeFree(SVnode *pVnode); ...@@ -21,35 +20,6 @@ static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode);
TD_DEF_MOD_INIT_FLAG(vnode);
TD_DEF_MOD_CLEAR_FLAG(vnode);
int vnodeInit(uint16_t nthreads) {
if (TD_CHECK_AND_SET_MODE_INIT(vnode) == TD_MOD_INITIALIZED) {
return 0;
}
if (walInit() < 0) {
return -1;
}
if (vnodeInitCommit(nthreads) < 0) {
return -1;
}
return 0;
}
void vnodeClear() {
if (TD_CHECK_AND_SET_MOD_CLEAR(vnode) == TD_MOD_CLEARD) {
return;
}
walCleanUp();
vnodeClearCommit();
}
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED, .vnodeClearFlag = TD_MOD_UNCLEARD, .stop = false};
static void* loop(void* arg);
int vnodeInit(uint16_t nthreads) {
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
return 0;
}
// Start commit handers
if (nthreads > 0) {
vnodeMgr.nthreads = nthreads;
vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
if (vnodeMgr.threads == NULL) {
return -1;
}
pthread_mutex_init(&(vnodeMgr.mutex), NULL);
pthread_cond_init(&(vnodeMgr.hasTask), NULL);
tDListInit(&(vnodeMgr.queue));
for (uint16_t i = 0; i < nthreads; i++) {
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
}
} else {
// TODO: if no commit thread is set, then another mechanism should be
// given. Otherwise, it is a false.
ASSERT(0);
}
if (walInit() < 0) {
return -1;
}
return 0;
}
void vnodeClear() {
if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeClearFlag)) == TD_MOD_CLEARD) {
return;
}
walCleanUp();
// Stop commit handler
pthread_mutex_lock(&(vnodeMgr.mutex));
vnodeMgr.stop = true;
pthread_cond_broadcast(&(vnodeMgr.hasTask));
pthread_mutex_unlock(&(vnodeMgr.mutex));
for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) {
pthread_join(vnodeMgr.threads[i], NULL);
}
tfree(vnodeMgr.threads);
pthread_cond_destroy(&(vnodeMgr.hasTask));
pthread_mutex_destroy(&(vnodeMgr.mutex));
}
static void* loop(void* arg) {
SVnodeTask* pTask;
for (;;) {
pthread_mutex_lock(&(vnodeMgr.mutex));
for (;;) {
pTask = TD_DLIST_HEAD(&(vnodeMgr.queue));
if (pTask == NULL) {
if (vnodeMgr.stop) {
pthread_mutex_unlock(&(vnodeMgr.mutex));
return NULL;
} else {
pthread_cond_wait(&(vnodeMgr.hasTask), &(vnodeMgr.mutex));
}
} else {
tDListPop(&(vnodeMgr.queue), pTask);
break;
}
}
pthread_mutex_unlock(&(vnodeMgr.mutex));
(*(pTask->execute))(pTask->arg);
}
return NULL;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册