diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index c18647e57541300638cac2183a6e8a066906424e..0d521b6bb5202711274dd6ebab40f734c40b5120 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -23,8 +23,6 @@ extern "C" { int32_t dnodeInitRead(); void dnodeCleanupRead(); void dnodeRead(SRpcMsg *pMsg); -void * dnodeAllocateRqueue(); -void dnodeFreeRqueue(void *rqueue); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 054d9d796dd7f341afcf9142fb6b6b8c64feb027..028eaa95c2905016e7e444e2508af141b7dfeee5 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -23,8 +23,6 @@ extern "C" { int32_t dnodeInitWrite(); void dnodeCleanupWrite(); void dnodeWrite(SRpcMsg *pMsg); -void * dnodeAllocateWqueue(); -void dnodeFreeWqueue(void *worker); void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code); #ifdef __cplusplus diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 6f46587f2caebbc65fe946004362f52507df3095..c9621697f03a92f2070ee6c7a577a3903966297b 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -46,14 +46,12 @@ typedef struct _thread_obj { SWriteWorker *writeWorker; } SWriteWorkerPool; -static void *dnodeProcessWriteQueue(void *param); -static void dnodeHandleIdleWorker(SWriteWorker *pWorker); +static void *dnodeProcessWriteQueue(void *param); +static void dnodeHandleIdleWorker(SWriteWorker *pWorker); SWriteWorkerPool wWorkerPool; int32_t dnodeInitWrite() { - - vnodeInitWrite(); wWorkerPool.max = tsNumOfCores; wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); @@ -217,7 +215,6 @@ static void *dnodeProcessWriteQueue(void *param) { return NULL; } - static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { int32_t num = taosGetQueueNumber(pWorker->qset); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index b94d4cbacb44fae1acf267e54c6e2e59639b8436..8d7abbf36b0f1c382eb7474c616733860da85b25 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -38,6 +38,13 @@ typedef enum { SDnodeRunStatus dnodeGetRunStatus(); SDnodeStatisInfo dnodeGetStatisInfo(); +void *dnodeAllocateWqueue(void *pVnode); +void dnodeFreeWqueue(void *queue); +void *dnodeAllocateRqueue(void *pVnode); +void dnodeFreeRqueue(void *rqueue); +void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code); + + #ifdef __cplusplus } #endif diff --git a/src/inc/vnode.h b/src/inc/vnode.h index d5edea6564cf9c6ebe4abbd298f54bfb1ca39ef2..a8bf7a73ecb066a7e33b3e5311b1b837a5f153c6 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -25,15 +25,14 @@ typedef struct { void *rsp; } SRspRet; -int32_t vnodeInitWrite(); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vnode, char *rootDir); int32_t vnodeClose(void *pVnode); void vnodeRelease(void *pVnode); - void* vnodeGetVnode(int32_t vgId); + void* vnodeGetRqueue(void *); void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWal(void *pVnode); diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 99dda7d389a0b9bed712358a5bcac4ad155dda77..76d74f7490dbdc9a0b8c68411f8e55d01a303040 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -43,7 +43,8 @@ typedef struct { void * cq; // continuous query } SVnodeObj; -int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); +int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); +void vnodeInitWriteFp(void); #ifdef __cplusplus } diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index b4d1349e2953c377f57d8f36b36023ace0648846..e0449d909d25619c2058135493933b2ac70d5043 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -25,11 +25,6 @@ #include "ttime.h" #include "ttimer.h" #include "twal.h" -#include "dnodeMClient.h" -#include "dnodeMgmt.h" -#include "dnodeRead.h" -#include "dnodeWrite.h" -#include "vnode.h" #include "vnodeInt.h" extern void *tsDnodeVnodesHash; @@ -100,6 +95,9 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + static pthread_once_t vnodeInitWrite = PTHREAD_ONCE_INIT; + pthread_once(&vnodeInitWrite, vnodeInitWriteFp); + SVnodeObj vnodeObj = {0}; vnodeObj.vgId = vnode; vnodeObj.status = VN_STATUS_INIT; diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index 2b05aab315cb96e96d80359d44378c62ce2e57a4..59431e04c2042e46a53296777a5808371f65c2d2 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -23,8 +23,6 @@ #include "tsdb.h" #include "twal.h" #include "dataformat.h" -#include "dnodeWrite.h" -#include "dnodeMgmt.h" #include "vnode.h" #include "vnodeInt.h" @@ -35,14 +33,12 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -int32_t vnodeInitWrite() { +void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg; - - return 0; } int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) {