diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2519f4247d5a795939757f0b1ce31bd8bb5f0ba3..7ff52b83d56b21357ac1c418aa2d29ab8bfdf5a5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -805,19 +805,19 @@ typedef struct { int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; -} SCreateVnodeMsg, SAlterVnodeMsg; +} SCreateVnodeReq, SAlterVnodeReq; typedef struct { int32_t vgId; int32_t dnodeId; - char db[TSDB_DB_FNAME_LEN]; uint64_t dbUid; -} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; + char db[TSDB_DB_FNAME_LEN]; +} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq; typedef struct { int32_t vgId; int8_t accessState; -} SAuthVnodeMsg; +} SAuthVnodeReq; typedef struct { SMsgHead header; diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index af56d69b11f47ba2102ad3ab69ce5d84d349f1bd..f3ad0b017634c7dcc4a530d8e28060c480ad58f9 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -32,6 +32,8 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; typedef struct SVnodeCfg { + int32_t vgId; + /** vnode buffer pool options */ struct { /** write buffer size */ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 56f05ec0bfe2c5bf884790a5fba2e5b88811168a..80241405a60ce7d05291eb79d838b5fe9218551d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -277,9 +277,12 @@ int32_t* taosGetErrno(); #define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452) #define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453) #define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454) -#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0460) -#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0461) -#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0462) +#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0460) +#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0461) +#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0462) +#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0463) +#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0464) +#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0465) // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index bf5f0122c131e0df025665197449f82be2f7b244..b5fae629592986923476ac44ce28d908bd628554 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -29,12 +29,12 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index 61b220158fb7b78a8a20b97ff17b89fa401c3ff5..15be59a4193c8964a6509abf47beaa21b6d91a3e 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) { } static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - int32_t refCount = 0; + if (pBnode == NULL) return; + SBnodeMgmt *pMgmt = &pDnode->bmgmt; taosRLockLatch(&pMgmt->latch); - if (pBnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pBnode != NULL) { - dTrace("release bnode, refCount:%d", refCount); - } + dTrace("release bnode, refCount:%d", refCount); } static int32_t dndReadBnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index ae37967b3db960ca6005d03aba3a084b70d657b1..6c23af7f002970d10a89f3dfc441bc82ba2f8c2b 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { } static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t refCount = 0; + if (pMnode == NULL) return; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosRLockLatch(&pMgmt->latch); - if (pMnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pMnode != NULL) { - dTrace("release mnode, refCount:%d", refCount); - } + dTrace("release mnode, refCount:%d", refCount); } static int32_t dndReadMnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 3deee93e293a1cb117e84e551f7bb4c4aeab0552..9d2f623c4578aea7d4180a66c09710fc4bb320c5 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) { } static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - int32_t refCount = 0; + if (pQnode == NULL) return; + SQnodeMgmt *pMgmt = &pDnode->qmgmt; taosRLockLatch(&pMgmt->latch); - if (pQnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pQnode != NULL) { - dTrace("release qnode, refCount:%d", refCount); - } + dTrace("release qnode, refCount:%d", refCount); } static int32_t dndReadQnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index ab4e38bfb2be82d1a1438dd7f8d7f203463955fd..00435d4c3e45a4433defc7fdb28260195748bf00 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) { } static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - int32_t refCount = 0; + if (pSnode == NULL) return; + SSnodeMgmt *pMgmt = &pDnode->smgmt; taosRLockLatch(&pMgmt->latch); - if (pSnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pSnode != NULL) { - dTrace("release snode, refCount:%d", refCount); - } + dTrace("release snode, refCount:%d", refCount); } static int32_t dndReadSnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 6427ab080aaca2ed88d0dac3fd287d4c1770ea47..5198e351abed1980f745f5398e627a0facbc3b57 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -40,7 +40,7 @@ typedef struct { STaosQueue *pSyncQ; STaosQueue *pApplyQ; STaosQueue *pQueryQ; - STaosQueue* pFetchQ; + STaosQueue *pFetchQ; } SVnodeObj; typedef struct { @@ -53,22 +53,8 @@ typedef struct { SWrapperCfg *pCfgs; } SVnodeThread; -static int32_t dndInitVnodeReadWorker(SDnode *pDnode); -static int32_t dndInitVnodeWriteWorker(SDnode *pDnode); -static int32_t dndInitVnodeSyncWorker(SDnode *pDnode); -static void dndCleanupVnodeReadWorker(SDnode *pDnode); -static void dndCleanupVnodeWriteWorker(SDnode *pDnode); -static void dndCleanupVnodeSyncWorker(SDnode *pDnode); -static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); +static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode); +static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); @@ -117,11 +103,9 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { if (pVnode == NULL) return; SVnodesMgmt *pMgmt = &pDnode->vmgmt; - taosRLockLatch(&pMgmt->latch); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } @@ -134,7 +118,7 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { } pVnode->vgId = pCfg->vgId; - pVnode->refCount = 1; + pVnode->refCount = 0; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; @@ -148,23 +132,8 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeFetchQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeWriteQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeApplyQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeSyncQueue(pDnode, pVnode) != 0) { + if (dndAllocVnodeQueue(pDnode, pVnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -192,12 +161,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - dndFreeVnodeQueryQueue(pDnode, pVnode); - dndFreeVnodeFetchQueue(pDnode, pVnode); - dndFreeVnodeWriteQueue(pDnode, pVnode); - dndFreeVnodeApplyQueue(pDnode, pVnode); - dndFreeVnodeSyncQueue(pDnode, pVnode); - + dndFreeVnodeQueue(pDnode, pVnode); vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; @@ -412,7 +376,10 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnode *pImpl = vnodeOpen(pCfg->path, NULL); + SVnodeCfg vnodeCfg = {0}; + vnodeCfg.vgId = pCfg->vgId; + + SVnode *pImpl = vnodeOpen(pCfg->path, &vnodeCfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -508,7 +475,6 @@ static void dndCloseVnodes(SDnode *pDnode) { SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - dndReleaseVnode(pDnode, pVnodes[i]); dndCloseVnode(pDnode, pVnodes[i]); } @@ -524,8 +490,8 @@ static void dndCloseVnodes(SDnode *pDnode) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { - SCreateVnodeMsg *pCreate = rpcMsg->pCont; +static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) { + SCreateVnodeReq *pCreate = pReq->pCont; pCreate->vgId = htonl(pCreate->vgId); pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dbUid = htobe64(pCreate->dbUid); @@ -549,7 +515,8 @@ static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { return pCreate; } -static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { +static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { + pCfg->vgId = pCreate->vgId; pCfg->wsize = pCreate->cacheBlockSize; pCfg->ssize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize; @@ -572,7 +539,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { pCfg->walCfg.vgId = pCreate->vgId; } -static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) { +static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) { memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); pCfg->dbUid = pCreate->dbUid; pCfg->dropped = 0; @@ -581,20 +548,20 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWra pCfg->vgVersion = pCreate->vgVersion; } -static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { - SDropVnodeMsg *pDrop = rpcMsg->pCont; +static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *pReq) { + SDropVnodeReq *pDrop = pReq->pCont; pDrop->vgId = htonl(pDrop->vgId); return pDrop; } -static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { - SAuthVnodeMsg *pAuth = rpcMsg->pCont; +static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *pReq) { + SAuthVnodeReq *pAuth = pReq->pCont; pAuth->vgId = htonl(pAuth->vgId); return pAuth; } -int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg); +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq); dDebug("vgId:%d, create vnode req is received", pCreate->vgId); SVnodeCfg vnodeCfg = {0}; @@ -607,16 +574,19 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { if (pVnode != NULL) { dDebug("vgId:%d, already exist, return success", pCreate->vgId); dndReleaseVnode(pDnode, pVnode); + terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; return 0; } - SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/); + SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { + dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); return -1; } int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl); if (code != 0) { + dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr()); vnodeClose(pImpl); vnodeDestroy(wrapperCfg.path); terrno = code; @@ -634,23 +604,20 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg); +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq); dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); SVnodeCfg vnodeCfg = {0}; dndGenerateVnodeCfg(pAlter, &vnodeCfg); - SWrapperCfg wrapperCfg = {0}; - dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg); - SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); - return terrno; + return -1; } - if (wrapperCfg.vgVersion == pVnode->vgVersion) { + if (pAlter->vgVersion == pVnode->vgVersion) { dndReleaseVnode(pDnode, pVnode); dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId); return 0; @@ -659,11 +626,11 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); - return terrno; + return -1; } int32_t oldVersion = pVnode->vgVersion; - pVnode->vgVersion = wrapperCfg.vgVersion; + pVnode->vgVersion = pAlter->vgVersion; int32_t code = dndWriteVnodesToFile(pDnode); if (code != 0) { pVnode->vgVersion = oldVersion; @@ -673,8 +640,8 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return code; } -int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDropVnodeReq *pDrop = vnodeParseDropVnodeReq(pReq); int32_t vgId = pDrop->vgId; dDebug("vgId:%d, drop vnode req is received", vgId); @@ -688,10 +655,10 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { pVnode->dropped = 1; if (dndWriteVnodesToFile(pDnode) != 0) { pVnode->dropped = 0; - return terrno; + dndReleaseVnode(pDnode, pVnode); + return -1; } - dndReleaseVnode(pDnode, pVnode); dndCloseVnode(pDnode, pVnode); vnodeClose(pVnode->pImpl); vnodeDestroy(pVnode->path); @@ -700,17 +667,16 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SAuthVnodeReq *pAuth = (SAuthVnodeReq *)vnodeParseAuthVnodeReq(pReq); - int32_t code = 0; int32_t vgId = pAuth->vgId; dDebug("vgId:%d, auth vnode req is received", vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to auth since %s", vgId, terrstr()); - return terrno; + return -1; } pVnode->accessState = pAuth->accessState; @@ -718,30 +684,30 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SSyncVnodeReq *pSync = (SSyncVnodeReq *)vnodeParseDropVnodeReq(pReq); - int32_t vgId = pAuth->vgId; - dDebug("vgId:%d, auth vnode req is received", vgId); + int32_t vgId = pSync->vgId; + dDebug("vgId:%d, sync vnode req is received", vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { - dDebug("vgId:%d, failed to auth since %s", vgId, terrstr()); - return terrno; + dDebug("vgId:%d, failed to sync since %s", vgId, terrstr()); + return -1; } if (vnodeSync(pVnode->pImpl) != 0) { - dError("vgId:%d, failed to auth vnode since %s", vgId, terrstr()); + dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); - return terrno; + return -1; } dndReleaseVnode(pDnode, pVnode); return 0; } -int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SCompactVnodeReq *pCompact = (SCompactVnodeReq *)vnodeParseDropVnodeReq(pReq); int32_t vgId = pCompact->vgId; dDebug("vgId:%d, compact vnode req is received", vgId); @@ -749,13 +715,13 @@ int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to compact since %s", vgId, terrstr()); - return terrno; + return -1; } if (vnodeCompact(pVnode->pImpl) != 0) { dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); - return terrno; + return -1; } dndReleaseVnode(pDnode, pVnode); @@ -814,6 +780,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_ for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); + // todo SRpcMsg *pRsp = NULL; (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); } @@ -825,6 +792,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); + // todo SRpcMsg *pRsp = NULL; (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); } @@ -848,21 +816,25 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) } if (code != TSDB_CODE_SUCCESS) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; - rpcSendResponse(&rsp); + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + } rpcFreeCont(pRpcMsg->pCont); } } static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { - SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); if (pVnode == NULL) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; - rpcSendResponse(&rsp); + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; + rpcSendResponse(&rsp); + } rpcFreeCont(pMsg->pCont); } @@ -903,142 +875,69 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); - if (pVnode == NULL) { - return -1; - } + if (pVnode == NULL) return -1; int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); dndReleaseVnode(pDnode, pVnode); return code; } -static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); - if (pVnode->pQueryQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - pVnode->pQueryQ = NULL; -} - -static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); - if (pVnode->pFetchQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); - pVnode->pFetchQ = NULL; -} - -static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { +static int32_t dndInitVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; int32_t maxFetchThreads = 4; - float threadsForQuery = MAX(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores, 1); + int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores); + int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1); + int32_t maxQueryThreads = minQueryThreads; + int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1); + int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1); SWorkerPool *pPool = &pMgmt->queryPool; pPool->name = "vnode-query"; - pPool->min = (int32_t)threadsForQuery; - pPool->max = pPool->min; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + pPool->min = minQueryThreads; + pPool->max = maxQueryThreads; + if (tWorkerInit(pPool) != 0) return -1; pPool = &pMgmt->fetchPool; pPool->name = "vnode-fetch"; - pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores); - pPool->max = pPool->min; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + pPool->min = minFetchThreads; + pPool->max = maxFetchThreads; + if (tWorkerInit(pPool) != 0) return -1; + + SMWorkerPool *pMPool = &pMgmt->writePool; + pMPool->name = "vnode-write"; + pMPool->max = maxWriteThreads; + if (tMWorkerInit(pMPool) != 0) return -1; - dDebug("vnode read worker is initialized"); + pMPool = &pMgmt->syncPool; + pMPool->name = "vnode-sync"; + pMPool->max = maxSyncThreads; + if (tMWorkerInit(pMPool) != 0) return -1; + + dDebug("vnode workers is initialized"); return 0; } -static void dndCleanupVnodeReadWorker(SDnode *pDnode) { +static void dndCleanupVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tWorkerCleanup(&pMgmt->fetchPool); tWorkerCleanup(&pMgmt->queryPool); - dDebug("vnode close worker is initialized"); -} - -static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); - if (pVnode->pWriteQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; + tMWorkerCleanup(&pMgmt->writePool); + tMWorkerCleanup(&pMgmt->syncPool); + dDebug("vnode workers is closed"); } -static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { +static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - pVnode->pWriteQ = NULL; -} -static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; + pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); - if (pVnode->pApplyQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); - pVnode->pApplyQ = NULL; -} - -static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->writePool; - pPool->name = "vnode-write"; - pPool->max = pDnode->opt.numOfCores; - if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode write worker is initialized"); - return 0; -} - -static void dndCleanupVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerCleanup(&pMgmt->writePool); - dDebug("vnode write worker is closed"); -} - -static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); - if (pVnode->pSyncQ == NULL) { + pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); + pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); + + if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || + pVnode->pQueryQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -1046,50 +945,26 @@ static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { return 0; } -static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { +static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; + tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); + tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); + pVnode->pWriteQ = NULL; + pVnode->pApplyQ = NULL; pVnode->pSyncQ = NULL; -} - -static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { - int32_t maxThreads = pDnode->opt.numOfCores / 2; - if (maxThreads < 1) maxThreads = 1; - - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->syncPool; - pPool->name = "vnode-sync"; - pPool->max = maxThreads; - if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode sync worker is initialized"); - return 0; -} - -static void dndCleanupVnodeSyncWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerCleanup(&pMgmt->syncPool); - dDebug("vnode sync worker is closed"); + pVnode->pFetchQ = NULL; + pVnode->pQueryQ = NULL; } int32_t dndInitVnodes(SDnode *pDnode) { dInfo("dnode-vnodes start to init"); - if (dndInitVnodeReadWorker(pDnode) != 0) { - dError("failed to init vnodes read worker since %s", terrstr()); - return -1; - } - - if (dndInitVnodeWriteWorker(pDnode) != 0) { - dError("failed to init vnodes write worker since %s", terrstr()); - return -1; - } - - if (dndInitVnodeSyncWorker(pDnode) != 0) { - dError("failed to init vnodes sync worker since %s", terrstr()); + if (dndInitVnodeWorkers(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + dError("failed to init vnode workers since %s", terrstr()); return -1; } @@ -1105,9 +980,7 @@ int32_t dndInitVnodes(SDnode *pDnode) { void dndCleanupVnodes(SDnode *pDnode) { dInfo("dnode-vnodes start to clean up"); dndCloseVnodes(pDnode); - dndCleanupVnodeReadWorker(pDnode); - dndCleanupVnodeWriteWorker(pDnode); - dndCleanupVnodeSyncWorker(pDnode); + dndCleanupVnodeWorkers(pDnode); dInfo("dnode-vnodes is cleaned up"); } diff --git a/source/dnode/mgmt/impl/test/CMakeLists.txt b/source/dnode/mgmt/impl/test/CMakeLists.txt index 8f721dae9a5433f272352204bbbc776e525f9c80..ce93a14d3fc30e6d22877e28a4d04f4e3618a349 100644 --- a/source/dnode/mgmt/impl/test/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/CMakeLists.txt @@ -4,8 +4,5 @@ add_subdirectory(qnode) add_subdirectory(bnode) add_subdirectory(snode) add_subdirectory(mnode) -add_subdirectory(db) -add_subdirectory(stb) -add_subdirectory(vgroup) - +add_subdirectory(vnode) add_subdirectory(sut) diff --git a/source/dnode/mgmt/impl/test/db/CMakeLists.txt b/source/dnode/mgmt/impl/test/db/CMakeLists.txt deleted file mode 100644 index cb9f1600fc2a76951a5cb81cf94c5d1ef48d5eb1..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/impl/test/db/CMakeLists.txt +++ /dev/null @@ -1,11 +0,0 @@ -aux_source_directory(. DB_SRC) -add_executable(dnode_test_db ${DB_SRC}) -target_link_libraries( - dnode_test_db - PUBLIC sut -) - -add_test( - NAME dnode_test_db - COMMAND dnode_test_db -) diff --git a/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt b/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt deleted file mode 100644 index b864b0593c26d15c4468c381f95c1ac337e1174f..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt +++ /dev/null @@ -1,11 +0,0 @@ -aux_source_directory(. VGROUP_SRC) -add_executable(dnode_test_vgroup ${VGROUP_SRC}) -target_link_libraries( - dnode_test_vgroup - PUBLIC sut -) - -add_test( - NAME dnode_test_vgroup - COMMAND dnode_test_vgroup -) diff --git a/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt b/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6fb8bb4ba4a599ecad81314c86ff061442a768d3 --- /dev/null +++ b/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt @@ -0,0 +1,11 @@ +aux_source_directory(. VNODE_SRC) +add_executable(dnode_test_vnode ${VNODE_SRC}) +target_link_libraries( + dnode_test_vnode + PUBLIC sut +) + +add_test( + NAME dnode_test_vnode + COMMAND dnode_test_vnode +) diff --git a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp b/source/dnode/mgmt/impl/test/vnode/vnode.cpp similarity index 83% rename from source/dnode/mgmt/impl/test/vgroup/vgroup.cpp rename to source/dnode/mgmt/impl/test/vnode/vnode.cpp index 7fa3b4ab6167b29a7eb63cbd2391fecc9e657700..72331371402fd88e6d6e3c80bae536468ac83a45 100644 --- a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp +++ b/source/dnode/mgmt/impl/test/vnode/vnode.cpp @@ -11,9 +11,9 @@ #include "sut.h" -class DndTestVgroup : public ::testing::Test { +class DndTestVnode : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vgroup", 9150); } + static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vnode", 9150); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -23,14 +23,14 @@ class DndTestVgroup : public ::testing::Test { void TearDown() override {} }; -Testbase DndTestVgroup::test; +Testbase DndTestVnode::test; -TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { +TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) { { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SCreateVnodeMsg); + int32_t contLen = sizeof(SCreateVnodeReq); - SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(contLen); + SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen); pReq->vgId = htonl(2); pReq->dnodeId = htonl(1); strcpy(pReq->db, "1.d1"); @@ -68,9 +68,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SAlterVnodeMsg); + int32_t contLen = sizeof(SAlterVnodeReq); - SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(contLen); + SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen); pReq->vgId = htonl(2); pReq->dnodeId = htonl(1); strcpy(pReq->db, "1.d1"); @@ -108,9 +108,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SDropVnodeMsg); + int32_t contLen = sizeof(SDropVnodeReq); - SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(contLen); + SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen); pReq->vgId = htonl(2); pReq->dnodeId = htonl(1); strcpy(pReq->db, "1.d1"); @@ -118,7 +118,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SDropVnodeMsg); + rpcMsg.contLen = sizeof(SDropVnodeReq); rpcMsg.msgType = TDMT_DND_DROP_VNODE; SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen); diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 6d391450b7c37833e8382c850f7f28ec0132d7e7..9e4656fec812a1d1c1a2c13ab6bc1f4870397e80 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); -SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); -SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 251cbb921786eed7e5ddb4f417cbf8f37c985515..bf5d01d0a261ef4d392c651e7ea1c969a6acd8a2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -331,11 +331,11 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SCreateVnodeMsg); + action.contLen = sizeof(SCreateVnodeReq); action.msgType = TDMT_DND_CREATE_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -360,11 +360,11 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SDropVnodeMsg); + action.contLen = sizeof(SDropVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; if (mndTransAppendUndoAction(pTrans, &action) != 0) { free(pMsg); @@ -593,11 +593,11 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SAlterVnodeMsg *pMsg = (SAlterVnodeMsg *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SAlterVnodeMsg); + action.contLen = sizeof(SAlterVnodeReq); action.msgType = TDMT_DND_ALTER_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -757,11 +757,11 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SCreateVnodeMsg); + action.contLen = sizeof(SCreateVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index bd17c6d15077ad0f303a1096e0bcf39b10eb021c..e9d35a6e4c82e14801abec69850c92b42805184b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -189,8 +189,8 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } -SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SCreateVnodeMsg *pCreate = calloc(1, sizeof(SCreateVnodeMsg)); +SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq)); if (pCreate == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -248,8 +248,8 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb return pCreate; } -SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SDropVnodeMsg *pDrop = calloc(1, sizeof(SDropVnodeMsg)); +SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq)); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index 4d37bb368d678fc589a97f472a9915467f73a6ce..3ca35d58a7cf8bbfdf29aac97390f8cf815f397f 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -10,3 +10,5 @@ add_subdirectory(show) add_subdirectory(profile) add_subdirectory(dnode) add_subdirectory(mnode) +add_subdirectory(db) +add_subdirectory(stb) diff --git a/source/dnode/mnode/impl/test/db/CMakeLists.txt b/source/dnode/mnode/impl/test/db/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f0abdf152cd5b081506a40d2f3b524560b47505c --- /dev/null +++ b/source/dnode/mnode/impl/test/db/CMakeLists.txt @@ -0,0 +1,11 @@ +aux_source_directory(. DB_SRC) +add_executable(mnode_test_db ${DB_SRC}) +target_link_libraries( + mnode_test_db + PUBLIC sut +) + +add_test( + NAME mnode_test_db + COMMAND mnode_test_db +) diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp similarity index 95% rename from source/dnode/mgmt/impl/test/db/db.cpp rename to source/dnode/mnode/impl/test/db/db.cpp index 3a69ae23054f3e44df87bbf1d07f562ad05f97c7..42cf753c7cc128480cce5c028f8b5c55d6e20eba 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -1,19 +1,19 @@ /** * @file db.cpp * @author slguan (slguan@taosdata.com) - * @brief DNODE module db-msg tests - * @version 0.1 - * @date 2021-12-15 + * @brief MNODE module db tests + * @version 1.0 + * @date 2022-01-11 * - * @copyright Copyright (c) 2021 + * @copyright Copyright (c) 2022 * */ #include "sut.h" -class DndTestDb : public ::testing::Test { +class MndTestDb : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_db", 9040); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -23,9 +23,9 @@ class DndTestDb : public ::testing::Test { void TearDown() override {} }; -Testbase DndTestDb::test; +Testbase MndTestDb::test; -TEST_F(DndTestDb, 01_ShowDb) { +TEST_F(MndTestDb, 01_ShowDb) { test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); CHECK_META("show databases", 18); CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name"); @@ -51,7 +51,7 @@ TEST_F(DndTestDb, 01_ShowDb) { EXPECT_EQ(test.GetShowRows(), 0); } -TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { +TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { { int32_t contLen = sizeof(SCreateDbMsg); @@ -211,7 +211,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { EXPECT_EQ(test.GetShowRows(), 0); } -TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { +TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { { int32_t contLen = sizeof(SCreateDbMsg); @@ -281,7 +281,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(pInfo->numOfEps, 1); SEpAddrMsg* pAddr = &pInfo->epAddr[0]; pAddr->port = htons(pAddr->port); - EXPECT_EQ(pAddr->port, 9040); + EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } @@ -297,7 +297,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(pInfo->numOfEps, 1); SEpAddrMsg* pAddr = &pInfo->epAddr[0]; pAddr->port = htons(pAddr->port); - EXPECT_EQ(pAddr->port, 9040); + EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } } diff --git a/source/dnode/mgmt/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt similarity index 100% rename from source/dnode/mgmt/impl/test/stb/CMakeLists.txt rename to source/dnode/mnode/impl/test/stb/CMakeLists.txt diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp similarity index 95% rename from source/dnode/mgmt/impl/test/stb/stb.cpp rename to source/dnode/mnode/impl/test/stb/stb.cpp index d3362c7a9bc7df3fce5a591e76fbf6da692f6119..55cc0301229465b64282feba41643f13b90b398d 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -1,19 +1,19 @@ /** * @file stb.cpp * @author slguan (slguan@taosdata.com) - * @brief DNODE module db-msg tests - * @version 0.1 - * @date 2021-12-17 + * @brief MNODE module stb tests + * @version 1.0 + * @date 2022-01-12 * - * @copyright Copyright (c) 2021 + * @copyright Copyright (c) 2022 * */ #include "sut.h" -class DndTestStb : public ::testing::Test { +class MndTestStb : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_stb", 9101); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_stb", 9034); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -23,9 +23,9 @@ class DndTestStb : public ::testing::Test { void TearDown() override {} }; -Testbase DndTestStb::test; +Testbase MndTestStb::test; -TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { +TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { { int32_t contLen = sizeof(SCreateDbMsg); diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index 88175646ae95739860fce867a0408b80e356f168..995bed6e0bedeccc6d2d5aaf8776e7aaa2271fa9 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -24,9 +24,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - if (pVnodeCfg == NULL) { + //if (pVnodeCfg == NULL) { pVnodeCfg = &defaultVnodeOptions; - } + //} // Validate options if (vnodeValidateOptions(pVnodeCfg) < 0) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index eaaaf282970370ba4e44da43f70f766cf0b3cb2b..3ea564722b12353f896d823e2d24947819519130 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -277,9 +277,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_NOT_DEPLOYED, "Bnode not deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_INVALID_OPTION, "Bnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_READ_FILE_ERROR, "Read bnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR, "Write bnode.json error") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnode directories") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED, "Vnode already deployed") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_NOT_DEPLOYED, "Vnode not deployed") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_INVALID_OPTION, "Vnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_READ_FILE_ERROR, "Read vnodes.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR, "Write vnodes.json error") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnodes") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress")