From d60941d4e3a74e94f1f0f0fce44e5b06761aa879 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 28 Dec 2021 00:20:48 -0800 Subject: [PATCH] add bnode --- include/dnode/bnode/bnode.h | 10 +- include/dnode/snode/snode.h | 10 +- source/dnode/bnode/src/bnode.c | 4 +- source/dnode/mgmt/impl/inc/dndBnode.h | 2 +- source/dnode/mgmt/impl/inc/dndInt.h | 25 +- source/dnode/mgmt/impl/inc/dndWorker.h | 4 +- source/dnode/mgmt/impl/src/dndBnode.c | 369 +++++++++++++++++++++++++ source/dnode/mgmt/impl/src/dndQnode.c | 1 - source/dnode/mgmt/impl/src/dndSnode.c | 344 +++++++++++++++++++++++ source/dnode/mgmt/impl/src/dndWorker.c | 36 ++- source/dnode/snode/src/snode.c | 4 +- 11 files changed, 778 insertions(+), 31 deletions(-) create mode 100644 source/dnode/mgmt/impl/src/dndBnode.c create mode 100644 source/dnode/mgmt/impl/src/dndSnode.c diff --git a/include/dnode/bnode/bnode.h b/include/dnode/bnode/bnode.h index 74574f5462..23cc3ca617 100644 --- a/include/dnode/bnode/bnode.h +++ b/include/dnode/bnode/bnode.h @@ -49,10 +49,11 @@ typedef struct { /** * @brief Start one Bnode in Dnode. * + * @param path Path of the bnode. * @param pOption Option of the bnode. * @return SBnode* The bnode object. */ -SBnode *bndOpen(const SBnodeOpt *pOption); +SBnode *bndOpen(const char *path, const SBnodeOpt *pOption); /** * @brief Stop Bnode in Dnode. @@ -79,6 +80,13 @@ int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad); */ int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs); +/** + * @brief Drop a bnode. + * + * @param path Path of the bnode. + */ +void bndDestroy(const char *path); + #ifdef __cplusplus } #endif diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 1d30bd1e43..97069437f2 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -49,10 +49,11 @@ typedef struct { /** * @brief Start one Snode in Dnode. * + * @param path Path of the snode. * @param pOption Option of the snode. * @return SSnode* The snode object. */ -SSnode *sndOpen(const SSnodeOpt *pOption); +SSnode *sndOpen(const char *path, const SSnodeOpt *pOption); /** * @brief Stop Snode in Dnode. @@ -80,6 +81,13 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); */ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +/** + * @brief Drop a snode. + * + * @param path Path of the snode. + */ +void sndDestroy(const char *path); + #ifdef __cplusplus } #endif diff --git a/source/dnode/bnode/src/bnode.c b/source/dnode/bnode/src/bnode.c index 40b22dd58d..9570bc72a0 100644 --- a/source/dnode/bnode/src/bnode.c +++ b/source/dnode/bnode/src/bnode.c @@ -15,7 +15,7 @@ #include "bndInt.h" -SBnode *bndOpen(const SBnodeOpt *pOption) { +SBnode *bndOpen(const char *path, const SBnodeOpt *pOption) { SBnode *pBnode = calloc(1, sizeof(SBnode)); return pBnode; } @@ -25,3 +25,5 @@ void bndClose(SBnode *pBnode) { free(pBnode); } int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad) { return 0; } int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs) { return 0; } + +void bndDestroy(const char *path) {} diff --git a/source/dnode/mgmt/impl/inc/dndBnode.h b/source/dnode/mgmt/impl/inc/dndBnode.h index a350eae2d4..853b54ff69 100644 --- a/source/dnode/mgmt/impl/inc/dndBnode.h +++ b/source/dnode/mgmt/impl/inc/dndBnode.h @@ -24,7 +24,7 @@ extern "C" { int32_t dndInitBnode(SDnode *pDnode); void dndCleanupBnode(SDnode *pDnode); -ioid dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 0d37828ecd..ff96b7cfdf 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -54,18 +54,19 @@ extern int32_t dDebugFlag; #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat; -typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EDndWorkerType; +typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); typedef struct { - EDndWorkerType type; + EWorkerType type; const char *name; int32_t minNum; int32_t maxNum; - FProcessItem fp; + void *queueFp; SDnode *pDnode; taos_queue queue; SWorkerPool pool; + SMWorkerPool mpool; } SDnodeWorker; typedef struct { @@ -122,25 +123,21 @@ typedef struct { } SQnodeMgmt; typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - char *file; - SSnode *pSnode; - SRWLatch latch; - taos_queue pWriteQ; - SWorkerPool writePool; + int32_t refCount; + int8_t deployed; + int8_t dropped; + SSnode *pSnode; + SRWLatch latch; + SDnodeWorker writeWorker; } SSnodeMgmt; typedef struct { int32_t refCount; int8_t deployed; int8_t dropped; - char *file; SBnode *pBnode; SRWLatch latch; - taos_queue pWriteQ; - SMWorkerPool writePool; + SDnodeWorker writeWorker; } SBnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndWorker.h b/source/dnode/mgmt/impl/inc/dndWorker.h index 237c0518e8..49ef88e67d 100644 --- a/source/dnode/mgmt/impl/inc/dndWorker.h +++ b/source/dnode/mgmt/impl/inc/dndWorker.h @@ -21,8 +21,8 @@ extern "C" { #endif #include "dndInt.h" -int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, FProcessItem fp); +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, void *queueFp); void dndCleanupWorker(SDnodeWorker *pWorker); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c new file mode 100644 index 0000000000..b978c1102f --- /dev/null +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -0,0 +1,369 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndBnode.h" +#include "dndDnode.h" +#include "dndTransport.h" +#include "dndWorker.h" + +static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs); + +static SBnode *dndAcquireBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + SBnode *pBnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pMgmt->deployed && !pMgmt->dropped) { + refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); + pBnode = pMgmt->pBnode; + } else { + terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; + } + taosRUnLockLatch(&pMgmt->latch); + + if (pBnode != NULL) { + dTrace("acquire bnode, refCount:%d", refCount); + } + return pBnode; +} + +static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pBnode != NULL) { + refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); + } + taosRUnLockLatch(&pMgmt->latch); + + if (pBnode != NULL) { + dTrace("release bnode, refCount:%d", refCount); + } +} + +static int32_t dndReadBnodeFile(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); + if (fp == NULL) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_BNODE_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_BNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_BNODE_OVER; + } + + cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); + if (!deployed || deployed->type != cJSON_Number) { + dError("failed to read %s since deployed not found", file); + goto PRASE_BNODE_OVER; + } + pMgmt->deployed = deployed->valueint; + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_Number) { + dError("failed to read %s since dropped not found", file); + goto PRASE_BNODE_OVER; + } + pMgmt->dropped = dropped->valueint; + + code = 0; + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + +PRASE_BNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + terrno = code; + return code; +} + +static int32_t dndWriteBnodeFile(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "w"); + if (fp == NULL) { + terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; + dError("failed to write %s since %s", file, terrstr()); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + + if (taosRenameFile(file, file) != 0) { + terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; + dError("failed to rename %s since %s", file, terrstr()); + return -1; + } + + dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + return 0; +} + +static int32_t dndStartBnodeWorker(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "bnode-write", 0, 1, + (FProcessItem)dndProcessBnodeQueue) != 0) { + dError("failed to start bnode write worker since %s", terrstr()); + return -1; + } + + return 0; +} + +static void dndStopBnodeWorker(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + + taosWLockLatch(&pMgmt->latch); + pMgmt->deployed = 0; + taosWUnLockLatch(&pMgmt->latch); + + while (pMgmt->refCount > 1) { + taosMsleep(10); + } + + dndCleanupWorker(&pMgmt->writeWorker); +} + +static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { + pOption->pDnode = pDnode; + pOption->sendMsgToDnodeFp = dndSendMsgToDnode; + pOption->sendMsgToMnodeFp = dndSendMsgToMnode; + pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->dnodeId = dndGetDnodeId(pDnode); + pOption->clusterId = dndGetClusterId(pDnode); + pOption->cfg.sver = pDnode->opt.sver; +} + +static int32_t dndOpenBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + SBnodeOpt option = {0}; + dndBuildBnodeOption(pDnode, &option); + + SBnode *pBnode = bndOpen(pDnode->dir.bnode, &option); + if (pBnode == NULL) { + dError("failed to open bnode since %s", terrstr()); + return -1; + } + + if (dndStartBnodeWorker(pDnode) != 0) { + dError("failed to start bnode worker since %s", terrstr()); + bndClose(pBnode); + return -1; + } + + if (dndWriteBnodeFile(pDnode) != 0) { + dError("failed to write bnode file since %s", terrstr()); + dndStopBnodeWorker(pDnode); + bndClose(pBnode); + return -1; + } + + taosWLockLatch(&pMgmt->latch); + pMgmt->pBnode = pBnode; + pMgmt->deployed = 1; + taosWUnLockLatch(&pMgmt->latch); + + dInfo("bnode open successfully"); + return 0; +} + +static int32_t dndDropBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + + SBnode *pBnode = dndAcquireBnode(pDnode); + if (pBnode == NULL) { + dError("failed to drop bnode since %s", terrstr()); + return -1; + } + + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 1; + taosRUnLockLatch(&pMgmt->latch); + + if (dndWriteBnodeFile(pDnode) != 0) { + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 0; + taosRUnLockLatch(&pMgmt->latch); + + dndReleaseBnode(pDnode, pBnode); + dError("failed to drop bnode since %s", terrstr()); + return -1; + } + + dndReleaseBnode(pDnode, pBnode); + dndStopBnodeWorker(pDnode); + bndClose(pBnode); + pMgmt->pBnode = NULL; + bndDestroy(pDnode->dir.bnode); + + return 0; +} + +int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SCreateBnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + return -1; + } else { + return dndOpenBnode(pDnode); + } +} + +int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SDropBnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + return -1; + } else { + return dndDropBnode(pDnode); + } +} + +static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) { + for (int32_t i = 0; i < numOfMsgs; ++i) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + dndSendBnodeErrorRsp(pMsg, code); + } +} + +static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) { + SBnode *pBnode = dndAcquireBnode(pDnode); + if (pBnode == NULL) { + dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); + return; + } + + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); + if (pArray == NULL) { + dndReleaseBnode(pDnode, pBnode); + dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); + return; + } + + for (int32_t i = 0; i < numOfMsgs; ++i) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + void *ptr = taosArrayPush(pArray, &pMsg); + if (ptr == NULL) { + dndSendBnodeErrorRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); + } + } + + bndProcessWMsgs(pBnode, pArray); + + for (size_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + taosArrayDestroy(pArray); + dndReleaseBnode(pDnode, pBnode); +} + +static void dndWriteBnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; + + SBnode *pBnode = dndAcquireBnode(pDnode); + if (pBnode != NULL) { + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseBnode(pDnode, pBnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + +void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteBnodeMsgToWorker(pDnode, &pDnode->bmgmt.writeWorker, pMsg); +} + +int32_t dndInitBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + taosInitRWLatch(&pMgmt->latch); + + if (dndReadBnodeFile(pDnode) != 0) { + return -1; + } + + if (pMgmt->dropped) return 0; + if (!pMgmt->deployed) return 0; + + return dndOpenBnode(pDnode); +} + +void dndCleanupBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + if (pMgmt->pBnode) { + dndStopBnodeWorker(pDnode); + bndClose(pMgmt->pBnode); + pMgmt->pBnode = NULL; + } +} diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 8c76bf95a6..0b92de81c1 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -252,7 +252,6 @@ static int32_t dndDropQnode(SDnode *pDnode) { dndStopQnodeWorker(pDnode); qndClose(pQnode); pMgmt->pQnode = NULL; - // qndDestroy(pDnode->dir.qnode); return 0; } diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c new file mode 100644 index 0000000000..c1eb347350 --- /dev/null +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -0,0 +1,344 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndSnode.h" +#include "dndDnode.h" +#include "dndTransport.h" +#include "dndWorker.h" + +static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static SSnode *dndAcquireSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + SSnode *pSnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pMgmt->deployed && !pMgmt->dropped) { + refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); + pSnode = pMgmt->pSnode; + } else { + terrno = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + } + taosRUnLockLatch(&pMgmt->latch); + + if (pSnode != NULL) { + dTrace("acquire snode, refCount:%d", refCount); + } + return pSnode; +} + +static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pSnode != NULL) { + refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); + } + taosRUnLockLatch(&pMgmt->latch); + + if (pSnode != NULL) { + dTrace("release snode, refCount:%d", refCount); + } +} + +static int32_t dndReadSnodeFile(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t code = TSDB_CODE_DND_SNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); + if (fp == NULL) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_SNODE_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_SNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_SNODE_OVER; + } + + cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); + if (!deployed || deployed->type != cJSON_Number) { + dError("failed to read %s since deployed not found", file); + goto PRASE_SNODE_OVER; + } + pMgmt->deployed = deployed->valueint; + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_Number) { + dError("failed to read %s since dropped not found", file); + goto PRASE_SNODE_OVER; + } + pMgmt->dropped = dropped->valueint; + + code = 0; + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + +PRASE_SNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + terrno = code; + return code; +} + +static int32_t dndWriteSnodeFile(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "w"); + if (fp == NULL) { + terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR; + dError("failed to write %s since %s", file, terrstr()); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + + if (taosRenameFile(file, file) != 0) { + terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR; + dError("failed to rename %s since %s", file, terrstr()); + return -1; + } + + dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + return 0; +} + +static int32_t dndStartSnodeWorker(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, + (FProcessItem)dndProcessSnodeQueue) != 0) { + dError("failed to start snode write worker since %s", terrstr()); + return -1; + } + + return 0; +} + +static void dndStopSnodeWorker(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + + taosWLockLatch(&pMgmt->latch); + pMgmt->deployed = 0; + taosWUnLockLatch(&pMgmt->latch); + + while (pMgmt->refCount > 1) { + taosMsleep(10); + } + + dndCleanupWorker(&pMgmt->writeWorker); +} + +static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { + pOption->pDnode = pDnode; + pOption->sendMsgToDnodeFp = dndSendMsgToDnode; + pOption->sendMsgToMnodeFp = dndSendMsgToMnode; + pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->dnodeId = dndGetDnodeId(pDnode); + pOption->clusterId = dndGetClusterId(pDnode); + pOption->cfg.sver = pDnode->opt.sver; +} + +static int32_t dndOpenSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + SSnodeOpt option = {0}; + dndBuildSnodeOption(pDnode, &option); + + SSnode *pSnode = sndOpen(pDnode->dir.snode, &option); + if (pSnode == NULL) { + dError("failed to open snode since %s", terrstr()); + return -1; + } + + if (dndStartSnodeWorker(pDnode) != 0) { + dError("failed to start snode worker since %s", terrstr()); + sndClose(pSnode); + return -1; + } + + if (dndWriteSnodeFile(pDnode) != 0) { + dError("failed to write snode file since %s", terrstr()); + dndStopSnodeWorker(pDnode); + sndClose(pSnode); + return -1; + } + + taosWLockLatch(&pMgmt->latch); + pMgmt->pSnode = pSnode; + pMgmt->deployed = 1; + taosWUnLockLatch(&pMgmt->latch); + + dInfo("snode open successfully"); + return 0; +} + +static int32_t dndDropSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode == NULL) { + dError("failed to drop snode since %s", terrstr()); + return -1; + } + + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 1; + taosRUnLockLatch(&pMgmt->latch); + + if (dndWriteSnodeFile(pDnode) != 0) { + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 0; + taosRUnLockLatch(&pMgmt->latch); + + dndReleaseSnode(pDnode, pSnode); + dError("failed to drop snode since %s", terrstr()); + return -1; + } + + dndReleaseSnode(pDnode, pSnode); + dndStopSnodeWorker(pDnode); + sndClose(pSnode); + pMgmt->pSnode = NULL; + sndDestroy(pDnode->dir.snode); + + return 0; +} + +int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SCreateSnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + return -1; + } else { + return dndOpenSnode(pDnode); + } +} + +int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SDropSnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + return -1; + } else { + return dndDropSnode(pDnode); + } +} + +static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + SRpcMsg *pRsp = NULL; + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = sndProcessMsg(pSnode, pMsg, &pRsp); + } + + if (pRsp != NULL) { + pRsp->ahandle = pMsg->ahandle; + rpcSendResponse(pRsp); + free(pRsp); + } else { + if (code != 0) code = terrno; + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); + } + + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseSnode(pDnode, pSnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + +void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg); +} + +int32_t dndInitSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + taosInitRWLatch(&pMgmt->latch); + + if (dndReadSnodeFile(pDnode) != 0) { + return -1; + } + + if (pMgmt->dropped) return 0; + if (!pMgmt->deployed) return 0; + + return dndOpenSnode(pDnode); +} + +void dndCleanupSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + if (pMgmt->pSnode) { + dndStopSnodeWorker(pDnode); + sndClose(pMgmt->pSnode); + pMgmt->pSnode = NULL; + } +} diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index da0e3a9319..c421437e4d 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -16,9 +16,9 @@ #define _DEFAULT_SOURCE #include "dndWorker.h" -int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, FProcessItem fp) { - if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || fp == NULL) { +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, void *queueFp) { + if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) { terrno = TSDB_CODE_INVALID_PARA; return -1; } @@ -27,19 +27,32 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type pWorker->name = name; pWorker->minNum = minNum; pWorker->maxNum = maxNum; - pWorker->fp = fp; + pWorker->queueFp = queueFp; pWorker->pDnode = pDnode; if (pWorker->type == DND_WORKER_SINGLE) { SWorkerPool *pPool = &pWorker->pool; + pPool->name = name; pPool->min = minNum; pPool->max = maxNum; if (tWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - - pWorker->queue = tWorkerAllocQueue(&pPool, pDnode, fp); + pWorker->queue = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } else if (pWorker->type == DND_WORKER_MULTI) { + SMWorkerPool *pPool = &pWorker->mpool; + pPool->name = name; + pPool->max = maxNum; + if (tMWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->queue = tMWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -52,12 +65,17 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type } void dndCleanupWorker(SDnodeWorker *pWorker) { + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + if (pWorker->type == DND_WORKER_SINGLE) { - while (!taosQueueEmpty(pWorker->queue)) { - taosMsleep(10); - } tWorkerCleanup(&pWorker->pool); tWorkerFreeQueue(&pWorker->pool, pWorker->queue); + } else if (pWorker->type == DND_WORKER_MULTI) { + tWorkerCleanup(&pWorker->mpool); + tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); + } else { } } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3423ce41e2..7ae4d49059 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -15,7 +15,7 @@ #include "sndInt.h" -SSnode *sndOpen(const SSnodeOpt *pOption) { +SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = calloc(1, sizeof(SSnode)); return pSnode; } @@ -28,3 +28,5 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { *pRsp = NULL; return 0; } + +void sndDestroy(const char *path) {} \ No newline at end of file -- GitLab