diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d3fb6c9f8dd7ade6fa68c5cba8fcf27c5025d9ee..f92ba36845ab92d972fff25eb1b5676dae4775c5 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -31,22 +31,7 @@ #include "dnodeWrite.h" #include "vnode.h" -typedef struct { - int32_t vgId; // global vnode group ID - int32_t refCount; // reference count - EVnodeStatus status; // status: master, slave, notready, deleting - int64_t version; - void * wworker; - void * rworker; - void * wal; - void * tsdb; - void * replica; - void * events; - void * cq; // continuous query -} SVnodeObj; - static int32_t dnodeOpenVnodes(); -static void dnodeCleanupVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); @@ -56,7 +41,6 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static void dnodeSendStatusMsg(void *handle, void *tmrId); static void dnodeReadDnodeId(); -void *tsDnodeVnodesHash = NULL; static void *tsDnodeTmr = NULL; static void *tsStatusTimer = NULL; static uint32_t tsRebootTime; @@ -72,12 +56,6 @@ int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; - tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); - if (tsDnodeVnodesHash == NULL) { - dError("failed to init vnode list"); - return -1; - } - tsRebootTime = taosGetTimestampSec(); tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); @@ -86,6 +64,10 @@ int32_t dnodeInitMgmt() { return -1; } + if ( vnodeInitModule() != TSDB_CODE_SUCCESS) { + return -1; + } + int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -106,11 +88,7 @@ void dnodeCleanupMgmt() { tsDnodeTmr = NULL; } - dnodeCleanupVnodes(); - if (tsDnodeVnodesHash == NULL) { - taosCleanUpIntHash(tsDnodeVnodesHash); - tsDnodeVnodesHash = NULL; - } + vnodeCleanupModule(); } void dnodeMgmt(SRpcMsg *pMsg) { @@ -129,14 +107,6 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -void *dnodeGetVnodeWworker(void *pVnode) { - return ((SVnodeObj *)pVnode)->wworker; -} - -void *dnodeGetVnodeRworker(void *pVnode) { - return ((SVnodeObj *)pVnode)->rworker; -} - static int32_t dnodeOpenVnodes() { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { @@ -166,13 +136,6 @@ static int32_t dnodeOpenVnodes() { return TSDB_CODE_SUCCESS; } -typedef void (*CleanupFp)(char *); -static void dnodeCleanupVnodes() { - int32_t num = taosGetIntHashSize(tsDnodeVnodesHash); - taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); - dPrint("dnode mgmt is closed, vnodes:%d", num); -} - static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; @@ -219,19 +182,6 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { return tsCfgDynamicOptions(pCfg->config); } -static void dnodeBuildVloadMsg(char *pNode, void * param) { - SVnodeObj *pVnode = (SVnodeObj *) pNode; - if (pVnode->status == TSDB_VN_STATUS_DELETING) return; - - SDMStatusMsg *pStatus = param; - if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; - - SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; - pLoad->vgId = htonl(pVnode->vgId); - pLoad->vnode = htonl(pVnode->vgId); - pLoad->status = pVnode->status; -} - static void dnodeSendStatusMsg(void *handle, void *tmrId) { if (tsDnodeTmr == NULL) { dError("dnode timer is already released"); @@ -263,7 +213,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { pStatus->diskAvailable = tsAvailDataDirGB; pStatus->alternativeRole = (uint8_t) tsAlternativeRole; - taosVisitIntHashWithFp(tsDnodeVnodesHash, dnodeBuildVloadMsg, pStatus); + vnodeBuildStatusMsg(pStatus); contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); pStatus->openVnodes = htons(pStatus->openVnodes); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 4e37b5733ef80b8fae7d04e9e6c871442ac2bc5c..1e92b40977e91383b3304e2f0fed1f700b076278 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -127,7 +127,7 @@ void dnodeRead(SRpcMsg *pMsg) { } void *dnodeAllocateRqueue(void *pVnode) { - taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); + taos_queue queue = taosOpenQueue(); if (queue == NULL) return NULL; taosAddIntoQset(readQset, queue, pVnode); @@ -144,6 +144,8 @@ void *dnodeAllocateRqueue(void *pVnode) { } } + dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue); + return queue; } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index c9621697f03a92f2070ee6c7a577a3903966297b..2348cf62a4c2b295a47b185087f0ed5b16c00596 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -106,7 +106,7 @@ void dnodeWrite(SRpcMsg *pMsg) { void *dnodeAllocateWqueue(void *pVnode) { SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; - taos_queue *queue = taosOpenQueue(); + void *queue = taosOpenQueue(); if (queue == NULL) return NULL; if (pWorker->qset == NULL) { @@ -129,7 +129,7 @@ void *dnodeAllocateWqueue(void *pVnode) { wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } - dTrace("queue:%p is allocated for pVnode:%p", queue, pVnode); + dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue); return queue; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 8d7abbf36b0f1c382eb7474c616733860da85b25..fa5e3a3b3dc7c40d50d4c904d951dc44d7ac575e 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -44,7 +44,6 @@ void *dnodeAllocateRqueue(void *pVnode); void dnodeFreeRqueue(void *rqueue); void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code); - #ifdef __cplusplus } #endif diff --git a/src/inc/vnode.h b/src/inc/vnode.h index a8bf7a73ecb066a7e33b3e5311b1b837a5f153c6..0ca5b971804032978227327d625c17444469be3c 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -25,6 +25,9 @@ typedef struct { void *rsp; } SRspRet; +int32_t vnodeInitModule(); +void vnodeCleanupModule(); + int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vnode, char *rootDir); @@ -39,6 +42,7 @@ void* vnodeGetWal(void *pVnode); void* vnodeGetTsdb(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item); +void vnodeBuildStatusMsg(void * param); #ifdef __cplusplus } diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index bddf68932da310431a5be53ee9d2fed79bd89945..6d75776ac810140bdb54670e4cc723ef1e2b4e3f 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - //pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems); + //pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { *pitem = pNode->item; *type = pNode->type; num = 1; - //pTrace("item:%p is fetched", *pitem); + // pTrace("item:%p is fetched, type:%d", *pitem, *type); } return num; diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 76d74f7490dbdc9a0b8c68411f8e55d01a303040..5ee1d5c18b658ebb333ed24436eaaa5738abdf84 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -34,13 +34,13 @@ typedef struct { EVnStatus status; int role; int64_t version; - void * wqueue; - void * rqueue; - void * wal; - void * tsdb; - void * sync; - void * events; - void * cq; // continuous query + void *wqueue; + void *rqueue; + void *wal; + void *tsdb; + void *sync; + void *events; + void *cq; // continuous query } SVnodeObj; int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index e0449d909d25619c2058135493933b2ac70d5043..8f4ce802e788a0a4945b38f4ac9a90d73380a202 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -25,10 +25,32 @@ #include "ttime.h" #include "ttimer.h" #include "twal.h" +#include "dnode.h" +#include "vnode.h" #include "vnodeInt.h" -extern void *tsDnodeVnodesHash; -static void vnodeCleanUp(SVnodeObj *pVnode); +static void *tsDnodeVnodesHash; +static void vnodeCleanUp(SVnodeObj *pVnode); +static void vnodeBuildVloadMsg(char *pNode, void * param); + +int32_t vnodeInitModule() { + + vnodeInitWriteFp(); + + tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); + if (tsDnodeVnodesHash == NULL) { + dError("failed to init vnode list"); + return -1; + } + + return 0; +} + +typedef void (*CleanupFp)(char *); +void vnodeCleanupModule() { + taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); + taosCleanUpIntHash(tsDnodeVnodesHash); +} int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; @@ -95,9 +117,6 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; - static pthread_once_t vnodeInitWrite = PTHREAD_ONCE_INIT; - pthread_once(&vnodeInitWrite, vnodeInitWriteFp); - SVnodeObj vnodeObj = {0}; vnodeObj.vgId = vnode; vnodeObj.status = VN_STATUS_INIT; @@ -194,6 +213,24 @@ void *vnodeGetTsdb(void *pVnode) { return ((SVnodeObj *)pVnode)->tsdb; } +void vnodeBuildStatusMsg(void *param) { + SDMStatusMsg *pStatus = param; + taosVisitIntHashWithFp(tsDnodeVnodesHash, vnodeBuildVloadMsg, pStatus); +} + +static void vnodeBuildVloadMsg(char *pNode, void * param) { + SVnodeObj *pVnode = (SVnodeObj *) pNode; + if (pVnode->status == VN_STATUS_DELETING) return; + + SDMStatusMsg *pStatus = param; + if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; + + SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; + pLoad->vgId = htonl(pVnode->vgId); + pLoad->vnode = htonl(pVnode->vgId); + pLoad->status = pVnode->status; +} + static void vnodeCleanUp(SVnodeObj *pVnode) { taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index 59431e04c2042e46a53296777a5808371f65c2d2..2c31f2243bbe773c6c3857617200a823d0c42a09 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -26,7 +26,7 @@ #include "vnode.h" #include "vnodeInt.h" -static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*); +static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index 96819b47ff8caebd65738c9f31d70ddb10488897..e192e91e8a156a4d34a38101afbeb96c4adf9e03 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -284,7 +284,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW } // write into queue - (*writeFp)(pVnode, buffer, TAOS_QTYPE_WAL); + (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); } return code;