diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index a5c7be74b5ea46ca7b66e4b8a2c6e85e96b769b6..e4f32b0f8cb40c6a48a93bd003c74eb5e1e0e394 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -20,12 +20,11 @@ extern "C" { #endif -int dnodeInitRead(); -void dnodeCleanupRead(); -void dnodeRead(SRpcMsg *); -void *dnodeAllocateReadWorker(); -void dnodeFreeReadWorker(void *rqueue); - +int32_t dnodeInitRead(); +void dnodeCleanupRead(); +void dnodeRead(void *pMsg); +void * dnodeAllocateReadWorker(); +void dnodeFreeReadWorker(void *rqueue); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 51340fe1c27c4016a183735b4ceab482da430074..2b1edf9e4023ad73e60c283f419a8e0a72876219 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -20,12 +20,11 @@ extern "C" { #endif -int dnodeInitWrite(); -void dnodeCleanupWrite(); -void dnodeWrite(SRpcMsg *pMsg); -void *dnodeAllocateWriteWorker(); -void dnodeFreeWriteWorker(void *worker); - +int32_t dnodeInitWrite(); +void dnodeCleanupWrite(); +void dnodeWrite(void *pMsg); +void * dnodeAllocateWriteWorker(); +void dnodeFreeWriteWorker(void *worker); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 25055643dfdc057a041607b287105c8e6d1a6b14..df65ddeac123edf6893aa042124756d68cf78d9f 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -56,7 +56,7 @@ static void * tsDnodeVnodesHash = NULL; int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeProcessDropVnodeMsg; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_VNODE] = dnodeProcessDropVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { @@ -176,26 +176,24 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) { // remove read queue dnodeFreeReadWorker(pVnode->rworker); + pVnode->rworker = NULL; // remove write queue dnodeFreeWriteWorker(pVnode->wworker); + pVnode->wworker = NULL; // remove wal // remove tsdb if (pVnode->tsdb) { tsdbCloseRepo(pVnode->tsdb); + pVnode->tsdb = NULL; } taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); } static int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnodeCfg) { - pVnodeCfg->vnode = htonl(pVnodeCfg->vnode); - pVnodeCfg->cfg.vgId = htonl(pVnodeCfg->cfg.vgId); - pVnodeCfg->cfg.maxSessions = htonl(pVnodeCfg->cfg.maxSessions); - pVnodeCfg->cfg.daysPerFile = htonl(pVnodeCfg->cfg.daysPerFile); - STsdbCfg tsdbCfg; tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.tsdbId = pVnodeCfg->vnode; @@ -248,47 +246,60 @@ static void dnodeDropVnode(SVnodeObj *pVnode) { dnodeCleanupVnode(pVnode); } -static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) { - -// SVnodeObj *pVnode; -// int32_t vgId; -// SVPeersMsg *pCfg; - - // check everything, if not ok, set terrno; - +static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - // everything is ok + SCreateVnodeMsg *pCreate = (SCreateVnodeMsg *) rpcMsg->pCont; + pCreate->vnode = htonl(pCreate->vnode); + pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); + pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); + pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); -// dnodeCreateVnode(vgId, pCfg); + SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); + if (pVnodeObj != NULL) { + rpcRsp.code = TSDB_CODE_SUCCESS; + } else { + rpcRsp.code = dnodeCreateVnode(pCreate); + } - //if (pVnode == NULL) terrno = TSDB_CODE + rpcSendResponse(&rpcRsp); + rpcFreeCont(rpcMsg->pCont); } -static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) { - - SVnodeObj *pVnode; - int32_t vgId; - - // check everything, if not ok, set terrno; +static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SDropVnodeMsg *pDrop = (SCreateVnodeMsg *) rpcMsg->pCont; + pDrop->vgId = htonl(pDrop->vgId); - // everything is ok - dnodeDropVnode(pVnode); + SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId); + if (pVnodeObj != NULL) { + dnodeDropVnode(pVnodeObj); + rpcRsp.code = TSDB_CODE_SUCCESS; + } else { + rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID; + } - //if (pVnode == NULL) terrno = TSDB_CODE + rpcSendResponse(&rpcRsp); + rpcFreeCont(rpcMsg->pCont); } -static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) { - - SVnodeObj *pVnode; - int32_t vgId; - - // check everything, if not ok, set terrno; +static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SCreateVnodeMsg *pCreate = (SCreateVnodeMsg *) rpcMsg->pCont; + pCreate->vnode = htonl(pCreate->vnode); + pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); + pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); + pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - // everything is ok -// dnodeAlterVnode(pVnode); + SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); + if (pVnodeObj != NULL) { + rpcRsp.code = TSDB_CODE_SUCCESS; + } else { + rpcRsp.code = dnodeCreateVnode(pCreate);; + } - //if (pVnode == NULL) terrno = TSDB_CODE + rpcSendResponse(&rpcRsp); + rpcFreeCont(rpcMsg->pCont); } - diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 8175bea691c872410b29fb1b65ffd393b41e39b7..dacc93ffc2bdefdc6b1ce1812dede80d55a150c3 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -16,10 +16,10 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" -#include "tlog.h" -#include "trpc.h" #include "taosmsg.h" +#include "tlog.h" #include "tqueue.h" +#include "trpc.h" #include "dnodeRead.h" #include "dnodeMgmt.h" @@ -31,7 +31,7 @@ typedef struct { typedef struct { void *pCont; - int contLen; + int32_t contLen; SRpcMsg rpcMsg; void *pVnode; SRpcContext *pRpcContext; // RPC message context @@ -42,16 +42,16 @@ static void dnodeProcessReadResult(SReadMsg *pRead); static void dnodeHandleIdleReadWorker(); static void dnodeProcessQueryMsg(SReadMsg *pMsg); static void dnodeProcessRetrieveMsg(SReadMsg *pMsg); -static void (*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode); +static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode); // module global variable static taos_qset readQset; -static int threads; // number of query threads -static int maxThreads; -static int minThreads; +static int32_t threads; // number of query threads +static int32_t maxThreads; +static int32_t minThreads; -int dnodeInitRead() { - dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg; +int32_t dnodeInitRead() { + dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg; dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg; readQset = taosOpenQset(); @@ -67,12 +67,14 @@ void dnodeCleanupRead() { taosCloseQset(readQset); } -void dnodeRead(SRpcMsg *pMsg) { - int leftLen = pMsg->contLen; - char *pCont = (char *)pMsg->pCont; - int contLen = 0; - int numOfVnodes = 0; - int32_t vgId = 0; +void dnodeRead(void *rpcMsg) { + SRpcMsg *pMsg = rpcMsg; + + int32_t leftLen = pMsg->contLen; + char *pCont = (char *) pMsg->pCont; + int32_t contLen = 0; + int32_t numOfVnodes = 0; + int32_t vgId = 0; SRpcContext *pRpcContext = NULL; // parse head, get number of vnodes; @@ -87,31 +89,31 @@ void dnodeRead(SRpcMsg *pMsg) { // get pVnode from vgId void *pVnode = dnodeGetVnode(vgId); if (pVnode == NULL) { - continue; } - + // put message into queue SReadMsg readMsg; - readMsg.rpcMsg = *pMsg; - readMsg.pCont = pCont; - readMsg.contLen = contLen; - readMsg.pRpcContext = pRpcContext; - readMsg.pVnode = pVnode; + readMsg.rpcMsg = *pMsg; + readMsg.pCont = pCont; + readMsg.contLen = contLen; + readMsg.pRpcContext = pRpcContext; + readMsg.pVnode = pVnode; taos_queue queue = dnodeGetVnodeRworker(pVnode); taosWriteQitem(queue, &readMsg); - - // next vnode + + // next vnode leftLen -= contLen; - pCont -= contLen; + pCont -= contLen; + + dnodeReleaseVnode(pVnode); } } void *dnodeAllocateReadWorker() { - taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); - if ( queue == NULL ) return NULL; + if (queue == NULL) return NULL; taosAddIntoQset(readQset, queue); @@ -131,7 +133,6 @@ void *dnodeAllocateReadWorker() { } void dnodeFreeReadWorker(void *rqueue) { - taosCloseQueue(rqueue); // dynamically adjust the number of threads @@ -144,16 +145,16 @@ static void *dnodeProcessReadQueue(void *param) { while (1) { if (taosReadQitemFromQset(qset, &readMsg) <= 0) { dnodeHandleIdleReadWorker(); - continue; + continue; } terrno = 0; if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) { (*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg); } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - + dnodeProcessReadResult(&readMsg); } @@ -161,7 +162,7 @@ static void *dnodeProcessReadQueue(void *param) { } static void dnodeHandleIdleReadWorker() { - int num = taosGetQueueNumber(readQset); + int32_t num = taosGetQueueNumber(readQset); if (num == 0 || (num <= minThreads && threads > minThreads)) { threads--; @@ -180,10 +181,10 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { if (pRpcContext) { if (terrno) { - if (pRpcContext->code == 0) pRpcContext->code = terrno; + if (pRpcContext->code == 0) pRpcContext->code = terrno; } - int count = atomic_add_fetch_32(&pRpcContext->count, 1); + int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1); if (count < pRpcContext->numOfVnodes) { // not over yet, multiple vnodes return; @@ -197,8 +198,8 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { SRpcMsg rsp; rsp.handle = pRead->rpcMsg.handle; - rsp.code = code; - rsp.pCont = NULL; + rsp.code = code; + rsp.pCont = NULL; rpcSendResponse(&rsp); rpcFreeCont(pRead->rpcMsg.pCont); // free the received message } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index b453fdff17181570bd3a6f1940c3f4c3e638362f..f86691d3d9bc93c943cee9935199918d1ec12ca7 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -15,11 +15,11 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "taosmsg.h" #include "taoserror.h" #include "tlog.h" -#include "trpc.h" #include "tqueue.h" -#include "taosmsg.h" +#include "trpc.h" #include "dnodeWrite.h" #include "dnodeMgmt.h" @@ -31,7 +31,7 @@ typedef struct { typedef struct _write { void *pCont; - int contLen; + int32_t contLen; SRpcMsg rpcMsg; void *pVnode; // pointer to vnode SRpcContext *pRpcContext; // RPC message context @@ -40,12 +40,12 @@ typedef struct _write { typedef struct { taos_qset qset; // queue set pthread_t thread; // thread - int workerId; // worker ID + int32_t workerId; // worker ID } SWriteWorker; typedef struct _thread_obj { - int max; // max number of workers - int nextId; // from 0 to max-1, cyclic + int32_t max; // max number of workers + int32_t nextId; // from 0 to max-1, cyclic SWriteWorker *writeWorker; } SWriteWorkerPool; @@ -59,8 +59,8 @@ static void dnodeProcessDropTableMsg(SWriteMsg *pMsg); SWriteWorkerPool wWorkerPool; -int dnodeInitWrite() { - dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; +int32_t dnodeInitWrite() { + dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg; dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableMsg; dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessDropTableMsg; @@ -68,7 +68,7 @@ int dnodeInitWrite() { wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); if (wWorkerPool.writeWorker == NULL) return -1; - for (int i=0; icontLen; - char *pCont = (char *)pMsg->pCont; - int contLen = 0; - int numOfVnodes = 0; - int32_t vgId = 0; +void dnodeWrite(void *rpcMsg) { + SRpcMsg *pMsg = rpcMsg; + + int32_t leftLen = pMsg->contLen; + char *pCont = (char *) pMsg->pCont; + int32_t contLen = 0; + int32_t numOfVnodes = 0; + int32_t vgId = 0; SRpcContext *pRpcContext = NULL; // parse head, get number of vnodes; @@ -108,11 +108,11 @@ void dnodeWrite(SRpcMsg *pMsg) { // put message into queue SWriteMsg writeMsg; - writeMsg.rpcMsg = *pMsg; - writeMsg.pCont = pCont; - writeMsg.contLen = contLen; - writeMsg.pRpcContext = pRpcContext; - writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later + writeMsg.rpcMsg = *pMsg; + writeMsg.pCont = pCont; + writeMsg.contLen = contLen; + writeMsg.pRpcContext = pRpcContext; + writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later taos_queue queue = dnodeGetVnodeWworker(pVnode); taosWriteQitem(queue, &writeMsg); @@ -150,7 +150,6 @@ void *dnodeAllocateWriteWorker() { } void dnodeFreeWriteWorker(void *wqueue) { - taosCloseQueue(wqueue); // dynamically adjust the number of threads @@ -160,7 +159,7 @@ static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; taos_qall qall; SWriteMsg writeMsg; - int numOfMsgs; + int32_t numOfMsgs; while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall); @@ -169,7 +168,7 @@ static void *dnodeProcessWriteQueue(void *param) { continue; } - for (int i=0; icode == 0) pRpcContext->code = terrno; } - int count = atomic_add_fetch_32(&pRpcContext->count, 1); + int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1); if (count < pRpcContext->numOfVnodes) { // not over yet, multiple vnodes return; @@ -226,15 +225,14 @@ static void dnodeProcessWriteResult(SWriteMsg *pWrite) { SRpcMsg rsp; rsp.handle = pWrite->rpcMsg.handle; - rsp.code = code; - rsp.pCont = NULL; + rsp.code = code; + rsp.pCont = NULL; rpcSendResponse(&rsp); rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message } static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { - - int num = taosGetQueueNumber(pWorker->qset); + int32_t num = taosGetQueueNumber(pWorker->qset); if (num > 0) { usleep(100); @@ -248,15 +246,12 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { - } static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { - } static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { - } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b9de3ec0b9d5cedf941e5e25e3a2033c1b27ac86..2c7db77ddfe6022cff536060c62a4d30cc303628 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -349,8 +349,8 @@ typedef struct { } SDRemoveSuperTableMsg; typedef struct { - int32_t vnode; -} SFreeVnodeMsg; + int32_t vgId; +} SDropVnodeMsg; typedef struct SColIndexEx { int16_t colId; diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 238eca25f8e554b49d6189c12d040896d07591fd..96fb226e54be136eb5379153bda90bfaefc3ede6 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -291,10 +291,10 @@ void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle); - SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); + SDropVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SDropVnodeMsg)); if (pFreeVnode != NULL) { pFreeVnode->vnode = htonl(vnode); - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DROP_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DROP_VNODE, pFreeVnode, sizeof(SDropVnodeMsg), ahandle); } }