From 041e6716010b2e641e076c2ed2b566200392bc38 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 28 Dec 2021 01:38:15 -0800 Subject: [PATCH] refact mnode queue --- include/dnode/mnode/mnode.h | 20 +- include/dnode/snode/snode.h | 2 +- source/dnode/mgmt/impl/inc/dndInt.h | 32 +- source/dnode/mgmt/impl/src/dndBnode.c | 22 +- source/dnode/mgmt/impl/src/dndMnode.c | 362 ++++---------------- source/dnode/mgmt/impl/src/dndQnode.c | 18 +- source/dnode/mgmt/impl/src/dndSnode.c | 22 +- source/dnode/mgmt/impl/src/dndWorker.c | 19 +- source/dnode/mgmt/impl/test/dnode/dnode.cpp | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 2 +- source/dnode/mnode/impl/src/mnode.c | 8 +- source/libs/parser/src/astToMsg.c | 1 + tests/script/unique/dnode/basic1.sim | 11 + 13 files changed, 157 insertions(+), 364 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index e0619b2133..a288e3e630 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -147,28 +147,12 @@ void mndCleanupMsg(SMnodeMsg *pMsg); void mndSendRsp(SMnodeMsg *pMsg, int32_t code); /** - * @brief Process the read request. + * @brief Process the read, write, sync request. * * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -void mndProcessReadMsg(SMnodeMsg *pMsg); - -/** - * @brief Process the write request. - * - * @param pMsg The request msg. - * @return int32_t 0 for success, -1 for failure. - */ -void mndProcessWriteMsg(SMnodeMsg *pMsg); - -/** - * @brief Process the sync request. - * - * @param pMsg The request msg. - * @return int32_t 0 for success, -1 for failure. - */ -void mndProcessSyncMsg(SMnodeMsg *pMsg); +void mndProcessMsg(SMnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 97069437f2..4913d2572f 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -79,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); * @param pRsp The response message * @return int32_t 0 for success, -1 for failure */ -int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); /** * @brief Drop a snode. diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index ff96b7cfdf..954e21aefa 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -65,8 +65,10 @@ typedef struct { void *queueFp; SDnode *pDnode; taos_queue queue; - SWorkerPool pool; - SMWorkerPool mpool; + union { + SWorkerPool pool; + SMWorkerPool mpool; + }; } SDnodeWorker; typedef struct { @@ -95,21 +97,17 @@ typedef struct { } SDnodeMgmt; typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - char *file; - SMnode *pMnode; - SRWLatch latch; - taos_queue pReadQ; - taos_queue pWriteQ; - taos_queue pSyncQ; - SWorkerPool readPool; - SWorkerPool writePool; - SWorkerPool syncPool; + int32_t refCount; + int8_t deployed; + int8_t dropped; + SMnode *pMnode; + SRWLatch latch; + SDnodeWorker readWorker; + SDnodeWorker writeWorker; + SDnodeWorker syncWorker; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; } SMnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index b978c1102f..992f6ac0a1 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -140,20 +140,22 @@ static int32_t dndWriteBnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartBnodeWorker(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "bnode-write", 0, 1, - (FProcessItem)dndProcessBnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -202,7 +204,9 @@ static int32_t dndOpenBnode(SDnode *pDnode) { return -1; } + pMgmt->deployed = 1; if (dndWriteBnodeFile(pDnode) != 0) { + pMgmt->deployed = 0; dError("failed to write bnode file since %s", terrstr()); dndStopBnodeWorker(pDnode); bndClose(pBnode); @@ -211,7 +215,6 @@ static int32_t dndOpenBnode(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->pBnode = pBnode; - pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); dInfo("bnode open successfully"); @@ -243,6 +246,8 @@ static int32_t dndDropBnode(SDnode *pDnode) { dndReleaseBnode(pDnode, pBnode); dndStopBnodeWorker(pDnode); + pMgmt->deployed = 0; + dndWriteBnodeFile(pDnode); bndClose(pBnode); pMgmt->pBnode = NULL; bndDestroy(pDnode->dir.bnode); @@ -353,7 +358,12 @@ int32_t dndInitBnode(SDnode *pDnode) { return -1; } - if (pMgmt->dropped) return 0; + if (pMgmt->dropped) { + dInfo("bnode has been deployed and needs to be deleted"); + bndDestroy(pDnode->dir.bnode); + return 0; + } + if (!pMgmt->deployed) return 0; return dndOpenBnode(pDnode); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 59f809489e..8fb95c0b75 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -17,42 +17,9 @@ #include "dndMnode.h" #include "dndDnode.h" #include "dndTransport.h" +#include "dndWorker.h" -static int32_t dndInitMnodeReadWorker(SDnode *pDnode); -static int32_t dndInitMnodeWriteWorker(SDnode *pDnode); -static int32_t dndInitMnodeSyncWorker(SDnode *pDnode); -static void dndCleanupMnodeReadWorker(SDnode *pDnode); -static void dndCleanupMnodeWriteWorker(SDnode *pDnode); -static void dndCleanupMnodeSyncWorker(SDnode *pDnode); -static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); -static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); -static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); -static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); -static void dndFreeMnodeReadQueue(SDnode *pDnode); -static void dndFreeMnodeWriteQueue(SDnode *pDnode); -static void dndFreeMnodeSyncQueue(SDnode *pDnode); -static void dndFreeMnodeMgmtQueue(SDnode *pDnode); - -static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); -void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); - -static int32_t dndStartMnodeWorker(SDnode *pDnode); -static void dndStopMnodeWorker(SDnode *pDnode); - -static SMnode *dndAcquireMnode(SDnode *pDnode); -static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode); - -static int32_t dndReadMnodeFile(SDnode *pDnode); -static int32_t dndWriteMnodeFile(SDnode *pDnode); - -static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption); -static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption); -static int32_t dndDropMnode(SDnode *pDnode); +static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg); static SMnode *dndAcquireMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -97,49 +64,52 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { char *content = calloc(1, maxLen + 1); cJSON *root = NULL; - FILE *fp = fopen(pMgmt->file, "r"); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); if (fp == NULL) { - dDebug("file %s not exist", pMgmt->file); + dDebug("file %s not exist", file); code = 0; goto PRASE_MNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", pMgmt->file); + dError("failed to read %s since content is null", file); goto PRASE_MNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", pMgmt->file); + dError("failed to read %s since invalid json format", file); goto PRASE_MNODE_OVER; } cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", pMgmt->file); + dError("failed to read %s since deployed not found", file); goto PRASE_MNODE_OVER; } pMgmt->deployed = deployed->valueint; cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", pMgmt->file); + dError("failed to read %s since dropped not found", file); goto PRASE_MNODE_OVER; } pMgmt->dropped = dropped->valueint; cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); if (!mnodes || mnodes->type != cJSON_Array) { - dError("failed to read %s since nodes not found", pMgmt->file); + dError("failed to read %s since nodes not found", file); goto PRASE_MNODE_OVER; } pMgmt->replica = cJSON_GetArraySize(mnodes); if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { - dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica); + dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); goto PRASE_MNODE_OVER; } @@ -151,28 +121,28 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { cJSON *id = cJSON_GetObjectItem(node, "id"); if (!id || id->type != cJSON_Number) { - dError("failed to read %s since id not found", pMgmt->file); + dError("failed to read %s since id not found", file); goto PRASE_MNODE_OVER; } pReplica->id = id->valueint; cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - dError("failed to read %s since fqdn not found", pMgmt->file); + dError("failed to read %s since fqdn not found", file); goto PRASE_MNODE_OVER; } tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); cJSON *port = cJSON_GetObjectItem(node, "port"); if (!port || port->type != cJSON_Number) { - dError("failed to read %s since port not found", pMgmt->file); + dError("failed to read %s since port not found", file); goto PRASE_MNODE_OVER; } pReplica->port = port->valueint; } code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); PRASE_MNODE_OVER: if (content != NULL) free(content); @@ -186,8 +156,8 @@ PRASE_MNODE_OVER: static int32_t dndWriteMnodeFile(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - char file[PATH_MAX + 20] = {0}; - snprintf(file, sizeof(file), "%s.bak", pMgmt->file); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/mnode.json.bak", pDnode->dir.dnode); FILE *fp = fopen(file, "w"); if (fp == NULL) { @@ -223,47 +193,36 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, pMgmt->file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", pMgmt->file, terrstr()); + dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartMnodeWorker(SDnode *pDnode) { - if (dndInitMnodeReadWorker(pDnode) != 0) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) { dError("failed to start mnode read worker since %s", terrstr()); return -1; } - if (dndInitMnodeWriteWorker(pDnode) != 0) { + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, dndProcessMnodeQueue) != 0) { dError("failed to start mnode write worker since %s", terrstr()); return -1; } - if (dndInitMnodeSyncWorker(pDnode) != 0) { + if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, dndProcessMnodeQueue) != 0) { dError("failed to start mnode sync worker since %s", terrstr()); return -1; } - if (dndAllocMnodeReadQueue(pDnode) != 0) { - dError("failed to alloc mnode read queue since %s", terrstr()); - return -1; - } - - if (dndAllocMnodeWriteQueue(pDnode) != 0) { - dError("failed to alloc mnode write queue since %s", terrstr()); - return -1; - } - - if (dndAllocMnodeSyncQueue(pDnode) != 0) { - dError("failed to alloc mnode sync queue since %s", terrstr()); - return -1; - } - return 0; } @@ -274,18 +233,13 @@ static void dndStopMnodeWorker(SDnode *pDnode) { pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); - while (pMgmt->refCount > 1) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10); - - dndCleanupMnodeReadWorker(pDnode); - dndCleanupMnodeWriteWorker(pDnode); - dndCleanupMnodeSyncWorker(pDnode); + while (pMgmt->refCount > 1) { + taosMsleep(10); + } - dndFreeMnodeReadQueue(pDnode); - dndFreeMnodeWriteQueue(pDnode); - dndFreeMnodeSyncQueue(pDnode); + dndCleanupWorker(&pMgmt->readWorker); + dndCleanupWorker(&pMgmt->writeWorker); + dndCleanupWorker(&pMgmt->syncWorker); } static bool dndNeedDeployMnode(SDnode *pDnode) { @@ -383,28 +337,21 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { dError("failed to open mnode since %s", terrstr()); return -1; } - pMgmt->deployed = 1; - int32_t code = dndWriteMnodeFile(pDnode); - if (code != 0) { - dError("failed to write mnode file since %s", terrstr()); - code = terrno; - pMgmt->deployed = 0; + if (dndStartMnodeWorker(pDnode) != 0) { + dError("failed to start mnode worker since %s", terrstr()); mndClose(pMnode); mndDestroy(pDnode->dir.mnode); - terrno = code; return -1; } - code = dndStartMnodeWorker(pDnode); - if (code != 0) { - dError("failed to start mnode worker since %s", terrstr()); - code = terrno; + pMgmt->deployed = 1; + if (dndWriteMnodeFile(pDnode) != 0) { + dError("failed to write mnode file since %s", terrstr()); pMgmt->deployed = 0; dndStopMnodeWorker(pDnode); mndClose(pMnode); mndDestroy(pDnode->dir.mnode); - terrno = code; return -1; } @@ -461,6 +408,7 @@ static int32_t dndDropMnode(SDnode *pDnode) { dndReleaseMnode(pDnode, pMnode); dndStopMnodeWorker(pDnode); + pMgmt->deployed = 0; dndWriteMnodeFile(pDnode); mndClose(pMnode); pMgmt->pMnode = NULL; @@ -528,13 +476,12 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } - -static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { +static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mndProcessReadMsg(pMsg); + mndProcessMsg(pMsg); dndReleaseMnode(pDnode, pMnode); } else { mndSendRsp(pMsg, terrno); @@ -543,208 +490,43 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { mndCleanupMsg(pMsg); } -static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; +static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) { + int32_t code = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mndProcessWriteMsg(pMsg); - dndReleaseMnode(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); - } - - mndCleanupMsg(pMsg); -} - -static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode != NULL) { - mndProcessSyncMsg(pMsg); - dndReleaseMnode(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); - } - - mndCleanupMsg(pMsg); -} - -static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { - SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (taosWriteQitem(pQueue, pMsg) != 0) { - mndCleanupMsg(pMsg); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); + SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); + if (pMsg == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + code = dndWriteMsgToWorker(pWorker, pMsg, 0); } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - dndReleaseMnode(pDnode, pMnode); -} - -void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); + if (code != 0) { + mndCleanupMsg(pMsg); } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; } - dndReleaseMnode(pDnode, pMnode); -} -void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + if (code != 0) { + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = code}; rpcSendResponse(&rsp); } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - - dndReleaseMnode(pDnode, pMnode); -} - - -static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue); - if (pMgmt->pReadQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + rpcFreeCont(pRpcMsg->pCont); } - - return 0; -} - -static void dndFreeMnodeReadQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ); - pMgmt->pReadQ = NULL; -} - -static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->readPool; - pPool->name = "mnode-read"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode read worker is initialized"); - return 0; -} - -static void dndCleanupMnodeReadWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->readPool); - dDebug("mnode read worker is closed"); -} - -static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue); - if (pMgmt->pWriteQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeMnodeWriteQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ); - pMgmt->pWriteQ = NULL; } -static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->writePool; - pPool->name = "mnode-write"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode write worker is initialized"); - return 0; -} - -static void dndCleanupMnodeWriteWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->writePool); - dDebug("mnode write worker is closed"); -} - -static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue); - if (pMgmt->pSyncQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeMnodeSyncQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ); - pMgmt->pSyncQ = NULL; +void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); } -static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->syncPool; - pPool->name = "mnode-sync"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode sync worker is initialized"); - return 0; +void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); } -static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->syncPool); - dDebug("mnode sync worker is closed"); +void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); } int32_t dndInitMnode(SDnode *pDnode) { @@ -752,14 +534,6 @@ int32_t dndInitMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); - char path[PATH_MAX]; - snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); - pMgmt->file = strdup(path); - if (pMgmt->file == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - if (dndReadMnodeFile(pDnode) != 0) { return -1; } @@ -790,13 +564,13 @@ int32_t dndInitMnode(SDnode *pDnode) { } void dndCleanupMnode(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - dInfo("dnode-mnode start to clean up"); - if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); - tfree(pMgmt->file); - mndClose(pMgmt->pMnode); - pMgmt->pMnode = NULL; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + if (pMgmt->pMnode) { + dndStopMnodeWorker(pDnode); + mndClose(pMgmt->pMnode); + pMgmt->pMnode = NULL; + } dInfo("dnode-mnode is cleaned up"); } diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 0b92de81c1..5d04a4f449 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -140,26 +140,27 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR; dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartQnodeWorker(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; - if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, - (FProcessItem)dndProcessQnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, dndProcessQnodeQueue) != 0) { dError("failed to start qnode query worker since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, - (FProcessItem)dndProcessQnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, dndProcessQnodeQueue) != 0) { dError("failed to start qnode fetch worker since %s", terrstr()); return -1; } @@ -209,7 +210,9 @@ static int32_t dndOpenQnode(SDnode *pDnode) { return -1; } + pMgmt->deployed = 1; if (dndWriteQnodeFile(pDnode) != 0) { + pMgmt->deployed = 0; dError("failed to write qnode file since %s", terrstr()); dndStopQnodeWorker(pDnode); qndClose(pQnode); @@ -218,7 +221,6 @@ static int32_t dndOpenQnode(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->pQnode = pQnode; - pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); dInfo("qnode open successfully"); @@ -250,6 +252,8 @@ static int32_t dndDropQnode(SDnode *pDnode) { dndReleaseQnode(pDnode, pQnode); dndStopQnodeWorker(pDnode); + pMgmt->deployed = 0; + dndWriteQnodeFile(pDnode); qndClose(pQnode); pMgmt->pQnode = NULL; diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index c1eb347350..151fc7e6a1 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -140,20 +140,22 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR; dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, - (FProcessItem)dndProcessSnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) { dError("failed to start snode write worker since %s", terrstr()); return -1; } @@ -202,7 +204,9 @@ static int32_t dndOpenSnode(SDnode *pDnode) { return -1; } + pMgmt->deployed = 1; if (dndWriteSnodeFile(pDnode) != 0) { + pMgmt->deployed = 0; dError("failed to write snode file since %s", terrstr()); dndStopSnodeWorker(pDnode); sndClose(pSnode); @@ -211,7 +215,6 @@ static int32_t dndOpenSnode(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->pSnode = pSnode; - pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); dInfo("snode open successfully"); @@ -243,6 +246,8 @@ static int32_t dndDropSnode(SDnode *pDnode) { dndReleaseSnode(pDnode, pSnode); dndStopSnodeWorker(pDnode); + pMgmt->deployed = 0; + dndWriteSnodeFile(pDnode); sndClose(pSnode); pMgmt->pSnode = NULL; sndDestroy(pDnode->dir.snode); @@ -328,7 +333,12 @@ int32_t dndInitSnode(SDnode *pDnode) { return -1; } - if (pMgmt->dropped) return 0; + if (pMgmt->dropped) { + dInfo("snode has been deployed and needs to be deleted"); + sndDestroy(pDnode->dir.snode); + return 0; + } + if (!pMgmt->deployed) return 0; return dndOpenSnode(pDnode); diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index c421437e4d..b1107fd185 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -73,7 +73,7 @@ void dndCleanupWorker(SDnodeWorker *pWorker) { tWorkerCleanup(&pWorker->pool); tWorkerFreeQueue(&pWorker->pool, pWorker->queue); } else if (pWorker->type == DND_WORKER_MULTI) { - tWorkerCleanup(&pWorker->mpool); + tMWorkerCleanup(&pWorker->mpool); tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); } else { } @@ -85,16 +85,23 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) return -1; } - void *pMsg = taosAllocateQitem(contLen); + void *pMsg = NULL; + if (contLen != 0) { + pMsg = taosAllocateQitem(contLen); + if (pMsg != NULL) { + memcpy(pMsg, pCont, contLen); + } + } else { + pMsg = pCont; + } + if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(pMsg, pCont, contLen); - - if (taosWriteQitem(pWorker, pMsg) != 0) { - taosFreeItem(pMsg); + if (taosWriteQitem(pWorker->queue, pMsg) != 0) { + taosFreeQitem(pMsg); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index dc352c5a3f..ec2c2d9a44 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -162,7 +162,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); strcpy(pReq->fqdn, "localhost"); - pReq->port = htonl(904); + pReq->port = htonl(9044); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 56559cbea1..2d236906e1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -388,7 +388,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * dnodeObj.updateTime = dnodeObj.createdTime; dnodeObj.port = pCreate->port; memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); - snprintf(dnodeObj.ep, "%s:%u", dnodeObj.fqdn, dnodeObj.port); + snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 64ea85044a..9281e46f4f 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -390,7 +390,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { rpcSendResponse(&rpcRsp); } -static void mndProcessRpcMsg(SMnodeMsg *pMsg) { +void mndProcessMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; tmsg_t msgType = pMsg->rpcMsg.msgType; @@ -451,12 +451,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } } -void mndProcessReadMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } - -void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } - -void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } - uint64_t mndGenerateUid(char *name, int32_t len) { int64_t us = taosGetTimestampUs(); int32_t hashval = MurmurHash3_32(name, len); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 2f80af225a..6c99411b71 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -428,6 +428,7 @@ SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf char* end = NULL; SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg)); pDrop->dnodeId = strtoll(pzName->z, &end, 10); + pDrop->dnodeId = htonl(pDrop->dnodeId); *len = sizeof(SDropDnodeMsg); if (end - pzName->z != pzName->n) { diff --git a/tests/script/unique/dnode/basic1.sim b/tests/script/unique/dnode/basic1.sim index 730864ef26..49b29a4ac8 100644 --- a/tests/script/unique/dnode/basic1.sim +++ b/tests/script/unique/dnode/basic1.sim @@ -94,5 +94,16 @@ if $rows != 2 then return -1 endi +print =============== drop dnode +sql drop dnode 2; +sql show dnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file -- GitLab