提交 f1f666e7 编写于 作者: S Shengliang Guan

add message queue for vnode

上级 530ffb6c
......@@ -66,15 +66,26 @@ typedef struct SVnodeCfg {
SWalCfg walCfg;
} SVnodeCfg;
typedef struct SDnode SDnode;
typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct {
int32_t sver;
SDnode *pDnode;
char *timezone;
char *locale;
char *charset;
PutReqToVQueryQFp putReqToVQueryQFp;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
} SVnodeOpt;
/* ------------------------ SVnode ------------------------ */
/**
* @brief Initialize the vnode module
*
* @param nthreads number of commit threads. 0 for no threads and
* a schedule queue should be given (TODO)
* @param pOption Option of the vnode mnodule
* @return int 0 for success and -1 for failure
*/
int vnodeInit(uint16_t nthreads);
int vnodeInit(const SVnodeOpt *pOption);
/**
* @brief clear a vnode
......
......@@ -22,8 +22,8 @@
#include "dndTransport.h"
#include "dndVnodes.h"
#include "sync.h"
#include "wal.h"
#include "tfs.h"
#include "wal.h"
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
......@@ -153,6 +153,8 @@ static void dndCleanupEnv(SDnode *pDnode) {
taosStopCacheRefreshWorker();
}
static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); }
SDnode *dndInit(SDnodeOpt *pOption) {
taosIgnSIGPIPE();
taosBlockSIGPIPE();
......@@ -196,7 +198,16 @@ SDnode *dndInit(SDnodeOpt *pOption) {
return NULL;
}
if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) {
SVnodeOpt vnodeOpt = {
.sver = pDnode->opt.sver,
.pDnode = pDnode,
.timezone = pDnode->opt.timezone,
.locale = pDnode->opt.locale,
.charset = pDnode->opt.charset,
.putReqToVQueryQFp = dndPutMsgToVQueryQ,
.nthreads = pDnode->opt.numOfCommitThreads,
};
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode env");
dndCleanup(pDnode);
return NULL;
......
......@@ -57,6 +57,8 @@ typedef struct SVnodeMgr {
pthread_cond_t hasTask;
TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt
SDnode* pDnode;
PutReqToVQueryQFp putReqToVQueryQFp;
} SVnodeMgr;
extern SVnodeMgr vnodeMgr;
......@@ -79,6 +81,8 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task);
void vnodePutReqToVQueryQ(struct SRpcMsg *pReq);
#ifdef __cplusplus
}
#endif
......
......@@ -19,17 +19,19 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};
static void* loop(void* arg);
int vnodeInit(uint16_t nthreads) {
int vnodeInit(const SVnodeOpt *pOption) {
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
return 0;
}
vnodeMgr.stop = false;
vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp;
vnodeMgr.pDnode = pOption->pDnode;
// Start commit handers
if (nthreads > 0) {
vnodeMgr.nthreads = nthreads;
vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
if (pOption->nthreads > 0) {
vnodeMgr.nthreads = pOption->nthreads;
vnodeMgr.threads = (pthread_t*)calloc(pOption->nthreads, sizeof(pthread_t));
if (vnodeMgr.threads == NULL) {
return -1;
}
......@@ -38,7 +40,7 @@ int vnodeInit(uint16_t nthreads) {
pthread_cond_init(&(vnodeMgr.hasTask), NULL);
TD_DLIST_INIT(&(vnodeMgr.queue));
for (uint16_t i = 0; i < nthreads; i++) {
for (uint16_t i = 0; i < pOption->nthreads; i++) {
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
}
......@@ -89,6 +91,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
return 0;
}
void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) {
if (vnodeMgr.putReqToVQueryQFp) {
(*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq);
}
}
/* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) {
SVnodeTask* pTask;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册