From f31a7b4683fb21d61d0dcd617aea3bbbfffe34cf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 17 Mar 2022 10:48:39 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/bnode/inc/bm.h | 35 ++++ source/dnode/mgmt/bnode/inc/bmInt.h | 70 ++++--- source/dnode/mgmt/bnode/src/bmFile.c | 111 +++++++++++ source/dnode/mgmt/bnode/src/bmInt.c | 197 +++++++++++++++++- source/dnode/mgmt/bnode/src/bmMgmt.c | 264 +------------------------ source/dnode/mgmt/bnode/src/bmMsg.c | 39 +++- source/dnode/mgmt/bnode/src/bmWorker.c | 0 source/dnode/mgmt/mnode/src/mmFile.c | 2 +- source/dnode/mgmt/mnode/src/mmInt.c | 10 +- 9 files changed, 435 insertions(+), 293 deletions(-) create mode 100644 source/dnode/mgmt/bnode/inc/bm.h delete mode 100644 source/dnode/mgmt/bnode/src/bmWorker.c diff --git a/source/dnode/mgmt/bnode/inc/bm.h b/source/dnode/mgmt/bnode/inc/bm.h new file mode 100644 index 0000000000..e402a20561 --- /dev/null +++ b/source/dnode/mgmt/bnode/inc/bm.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_BNODE_H_ +#define _TD_DND_BNODE_H_ + +#include "dnd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void bmGetMgmtFp(SMgmtWrapper *pWrapper); +void bmInitMsgHandles(SMgmtWrapper *pWrapper); + +int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); +int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_BNODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index 3427f670a8..9988bedb52 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -16,48 +16,62 @@ #ifndef _TD_DND_BNODE_INT_H_ #define _TD_DND_BNODE_INT_H_ -#include "dnd.h" +#include "mm.h" +#include "dm.h" #ifdef __cplusplus extern "C" { #endif - typedef struct SBnodeMgmt { - int32_t refCount; - int8_t deployed; - int8_t dropped; - SBnode *pBnode; - SRWLatch latch; - SDnodeWorker writeWorker; - SProcObj *pProcess; - bool singleProc; + int32_t refCount; + int8_t deployed; + int8_t dropped; + SBnode *pBnode; + SDnode *pDnode; + SMgmtWrapper *pWrapper; + const char *path; + SRWLatch latch; + SDnodeWorker writeWorker; } SBnodeMgmt; -void bmGetMgmtFp(SMgmtWrapper *pMgmt); +// mmFile.c +int32_t bmReadFile(SBnodeMgmt *pMgmt); +int32_t bmWriteFile(SBnodeMgmt *pMgmt); + +SBnode *bmAcquire(SBnodeMgmt *pMgmt); +void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode); + +// SBnode *mmAcquire(SMnodeMgmt *pMgmt); +// void mmRelease(SMnodeMgmt *pMgmt, SBnode *pMnode); +// int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption); +// int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption); +// int32_t mmDrop(SMnodeMgmt *pMgmt); + -int32_t dndInitBnode(SDnode *pDnode); -void dndCleanupBnode(SDnode *pDnode); +// void bmGetMgmtFp(SMgmtWrapper *pMgmt); -void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); -int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); +// int32_t dndInitBnode(SDnode *pDnode); +// void dndCleanupBnode(SDnode *pDnode); -void bmInitMsgHandles(SMgmtWrapper *pWrapper); +// void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +// int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); +// int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); -int32_t bmStartWorker(SDnode *pDnode); -void bmStopWorker(SDnode *pDnode); -void bmInitMsgFp(SMnodeMgmt *pMgmt); -void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +// void bmInitMsgHandles(SMgmtWrapper *pWrapper); -void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +// int32_t bmStartWorker(SDnode *pDnode); +// void bmStopWorker(SDnode *pDnode); +// void bmInitMsgFp(SMnodeMgmt *pMgmt); +// void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +// int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +// int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +// void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +// void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +// void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +// void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +// void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/src/bmFile.c b/source/dnode/mgmt/bnode/src/bmFile.c index e69de29bb2..d5f989fc6e 100644 --- a/source/dnode/mgmt/bnode/src/bmFile.c +++ b/source/dnode/mgmt/bnode/src/bmFile.c @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "bmInt.h" + +int32_t bmReadFile(SBnodeMgmt *pMgmt) { + int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 1024; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + char file[PATH_MAX]; + TdFilePtr pFile = NULL; + + snprintf(file, sizeof(file), "%s%sbnode.json", pMgmt->path, TD_DIRSEP); + pFile = taosOpenFile(file, TD_FILE_READ); + if (pFile == NULL) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_BNODE_OVER; + } + + len = (int32_t)taosReadFile(pFile, content, maxLen); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_BNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_BNODE_OVER; + } + + cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); + if (!deployed || deployed->type != cJSON_Number) { + dError("failed to read %s since deployed not found", file); + goto PRASE_BNODE_OVER; + } + pMgmt->deployed = deployed->valueint; + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_Number) { + dError("failed to read %s since dropped not found", file); + goto PRASE_BNODE_OVER; + } + pMgmt->dropped = dropped->valueint; + + code = 0; + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + +PRASE_BNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (pFile != NULL) taosCloseFile(&pFile); + + terrno = code; + return code; +} + +int32_t bmWriteFile(SBnodeMgmt *pMgmt) { + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s%sbnode.json", pMgmt->path, TD_DIRSEP); + + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; + dError("failed to write %s since %s", file, terrstr()); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 1024; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, "}\n"); + + taosWriteFile(pFile, content, len); + taosFsyncFile(pFile); + taosCloseFile(&pFile); + free(content); + + char realfile[PATH_MAX]; + snprintf(realfile, sizeof(realfile), "%s%sbnode.json", pMgmt->path); + + if (taosRenameFile(file, realfile) != 0) { + terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; + dError("failed to rename %s since %s", file, terrstr()); + return -1; + } + + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); + return 0; +} diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index ac696b4638..7ffeb1a8da 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -16,15 +16,202 @@ #define _DEFAULT_SOURCE #include "bmInt.h" -bool bmRequireNode(SMgmtWrapper *pWrapper) { return false; } +SBnode *bmAcquire(SBnodeMgmt *pMgmt) { + SBnode *pBnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pBnode != NULL) { + refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); + pBnode = pMgmt->pBnode; + } else { + terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; + } + taosRUnLockLatch(&pMgmt->latch); + + if (pBnode != NULL) { + dTrace("acquire bnode, refCount:%d", refCount); + } + return pBnode; +} + +void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode) { + if (pBnode == NULL) return; + + taosRLockLatch(&pMgmt->latch); + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); + taosRUnLockLatch(&pMgmt->latch); + dTrace("release bnode, refCount:%d", refCount); +} + +static bool bmRequire(SMgmtWrapper *pWrapper) { + SBnodeMgmt mgmt = {0}; + mgmt.path = pWrapper->path; + if (mmReadFile(&mgmt) != 0) { + return false; + } + + if (mgmt.dropped) { + dInfo("bnode has been dropped and needs to be deleted"); + mndDestroy(mgmt.path); + return false; + } + + if (mgmt.deployed) { + dInfo("bnode has been deployed"); + return true; + } + + bool required = mmDeployRequired(pWrapper->pDnode); + if (required) { + dInfo("bnode need to be deployed"); + } + + return required; +} + +static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { + SDnode *pDnode = pMgmt->pDnode; + + pOption->pWrapper = pMgmt->pWrapper; + pOption->sendReqFp = dndSendReqToDnode; + pOption->sendMnodeReqFp = dndSendReqToMnode; + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; +} + +int32_t bmOpen(SBnodeMgmt *pMgmt, SMnodeOpt *pOption) { + SDnode *pDnode = pMgmt->pDnode; + + SBnode *pBnode = bmAcquire(pDnode); + if (pBnode != NULL) { + bmRelease(pDnode, pBnode); + terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED; + dError("failed to create bnode since %s", terrstr()); + return -1; + } + + pBnode = bndOpen(pMgmt->path, pOption); + if (pBnode == NULL) { + dError("failed to open bnode since %s", terrstr()); + return -1; + } + + if (bmStartWorker(pDnode) != 0) { + dError("failed to start bnode worker since %s", terrstr()); + bndClose(pBnode); + bndDestroy(pMgmt->path); + return -1; + } + + pMgmt->deployed = 1; + if (bmWriteFile(pDnode) != 0) { + dError("failed to write bnode file since %s", terrstr()); + pMgmt->deployed = 0; + bmStopWorker(pDnode); + bndClose(pBnode); + bndDestroy(pMgmt->path); + return -1; + } + + taosWLockLatch(&pMgmt->latch); + pMgmt->pBnode = pBnode; + pMgmt->deployed = 1; + taosWUnLockLatch(&pMgmt->latch); + + dInfo("bnode open successfully"); + return 0; +} + +int32_t bmDrop(SBnodeMgmt *pMgmt) { + SBnode *pBnode = bmAcquire(pMgmt); + if (pBnode == NULL) { + dError("failed to drop bnode since %s", terrstr()); + return -1; + } + + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 1; + taosRUnLockLatch(&pMgmt->latch); + + if (bmWriteFile(pMgmt) != 0) { + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 0; + taosRUnLockLatch(&pMgmt->latch); + + bmRelease(pMgmt, pBnode); + dError("failed to drop bnode since %s", terrstr()); + return -1; + } + + bmRelease(pMgmt, pBnode); + bmStopWorker(pMgmt); + pMgmt->deployed = 0; + bmWriteFile(pMgmt); + bndClose(pBnode); + pMgmt->pBnode = NULL; + bndDestroy(pMgmt->path); + + return 0; +} + +static int32_t bmInit(SMgmtWrapper *pWrapper) { + SDnode *pDnode = pWrapper->pDnode; + SBnodeMgmt *pMgmt = calloc(1, sizeof(SBnodeMgmt)); + int32_t code = -1; + SBnodeOpt option = {0}; + + dInfo("bnode-mgmt start to init"); + if (pMgmt == NULL) goto _OVER; + + pMgmt->path = pWrapper->path; + pMgmt->pDnode = pWrapper->pDnode; + pMgmt->pWrapper = pWrapper; + taosInitRWLatch(&pMgmt->latch); + + if (bmReadFile(pMgmt) != 0) { + dError("failed to read file since %s", terrstr()); + goto _OVER; + } + + dInfo("bnode start to open"); + bmInitOption(pDnode, &option); + code = bmOpen(pMgmt, &option); + +_OVER: + if (code == 0) { + pWrapper->pMgmt = pMgmt; + dInfo("bnode-mgmt is initialized"); + } else { + dError("failed to init bnode-mgmt since %s", terrstr()); + bmCleanup(pWrapper); + } + + return code; +} + +static void bmCleanup(SMgmtWrapper *pWrapper) { + SBnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; + + dInfo("bnode-mgmt start to cleanup"); + if (pMgmt->pBnode) { + bmStopWorker(pMgmt); + bndClose(pMgmt->pBnode); + pMgmt->pBnode = NULL; + } + free(pMgmt); + pWrapper->pMgmt = NULL; + dInfo("bnode-mgmt is cleaned up"); +} void bmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = NULL; - mgmtFp.closeFp = NULL; - mgmtFp.requiredFp = bmRequireNode; + mgmtFp.openFp = bmInit; + mgmtFp.closeFp = bmCleanup; + mgmtFp.requiredFp = bmRequire; bmInitMsgHandles(pWrapper); - pWrapper->name = "snode"; + pWrapper->name = "bnode"; pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/bnode/src/bmMgmt.c b/source/dnode/mgmt/bnode/src/bmMgmt.c index e6cca1b02c..60f2527657 100644 --- a/source/dnode/mgmt/bnode/src/bmMgmt.c +++ b/source/dnode/mgmt/bnode/src/bmMgmt.c @@ -21,136 +21,8 @@ #if 0 static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); -static SBnode *dndAcquireBnode(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - SBnode *pBnode = NULL; - int32_t refCount = 0; - - taosRLockLatch(&pMgmt->latch); - if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pBnode != NULL) { - refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); - pBnode = pMgmt->pBnode; - } else { - terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; - } - taosRUnLockLatch(&pMgmt->latch); - - if (pBnode != NULL) { - dTrace("acquire bnode, refCount:%d", refCount); - } - return pBnode; -} -static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { - if (pBnode == NULL) return; - - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - taosRLockLatch(&pMgmt->latch); - int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - taosRUnLockLatch(&pMgmt->latch); - dTrace("release bnode, refCount:%d", refCount); -} - -static int32_t dndReadBnodeFile(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 1024; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "r"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); - if (pFile == NULL) { - dDebug("file %s not exist", file); - code = 0; - goto PRASE_BNODE_OVER; - } - - len = (int32_t)taosReadFile(pFile, content, maxLen); - if (len <= 0) { - dError("failed to read %s since content is null", file); - goto PRASE_BNODE_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", file); - goto PRASE_BNODE_OVER; - } - - cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); - if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", file); - goto PRASE_BNODE_OVER; - } - pMgmt->deployed = deployed->valueint; - - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", file); - goto PRASE_BNODE_OVER; - } - pMgmt->dropped = dropped->valueint; - - code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); - -PRASE_BNODE_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (pFile != NULL) taosCloseFile(&pFile); - - terrno = code; - return code; -} - -static int32_t dndWriteBnodeFile(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "w"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; - dError("failed to write %s since %s", file, terrstr()); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 1024; - char *content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); - len += snprintf(content + len, maxLen - len, "}\n"); - - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); - taosCloseFile(&pFile); - free(content); - - char realfile[PATH_MAX + 20]; - snprintf(realfile, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); - - if (taosRenameFile(file, realfile) != 0) { - terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", file, terrstr()); - return -1; - } - - dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); - return 0; -} - -static int32_t dndStartBnodeWorker(SDnode *pDnode) { +static int32_t bmStartWorker(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) { dError("failed to start bnode write worker since %s", terrstr()); @@ -160,7 +32,7 @@ static int32_t dndStartBnodeWorker(SDnode *pDnode) { return 0; } -static void dndStopBnodeWorker(SDnode *pDnode) { +static void bmStopWorker(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; taosWLockLatch(&pMgmt->latch); @@ -174,124 +46,6 @@ static void dndStopBnodeWorker(SDnode *pDnode) { dndCleanupWorker(&pMgmt->writeWorker); } -static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { - pOption->pDnode = pDnode; - pOption->sendReqFp = dndSendReqToDnode; - pOption->sendMnodeReqFp = dndSendReqToMnode; - pOption->sendRedirectRspFp = dmSendRedirectRsp; - pOption->dnodeId = pDnode->dnodeId; - pOption->clusterId = pDnode->clusterId; - pOption->sver = tsVersion; -} - -static int32_t dndOpenBnode(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - SBnode *pBnode = dndAcquireBnode(pDnode); - if (pBnode != NULL) { - dndReleaseBnode(pDnode, pBnode); - terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED; - dError("failed to create bnode since %s", terrstr()); - return -1; - } - - SBnodeOpt option = {0}; - dndBuildBnodeOption(pDnode, &option); - - pBnode = bndOpen(pDnode->dir.bnode, &option); - if (pBnode == NULL) { - dError("failed to open bnode since %s", terrstr()); - return -1; - } - - if (dndStartBnodeWorker(pDnode) != 0) { - dError("failed to start bnode worker since %s", terrstr()); - bndClose(pBnode); - return -1; - } - - pMgmt->deployed = 1; - if (dndWriteBnodeFile(pDnode) != 0) { - pMgmt->deployed = 0; - dError("failed to write bnode file since %s", terrstr()); - dndStopBnodeWorker(pDnode); - bndClose(pBnode); - return -1; - } - - taosWLockLatch(&pMgmt->latch); - pMgmt->pBnode = pBnode; - taosWUnLockLatch(&pMgmt->latch); - - dInfo("bnode open successfully"); - return 0; -} - -static int32_t dndDropBnode(SDnode *pDnode) { - SBnodeMgmt *pMgmt = &pDnode->bmgmt; - - SBnode *pBnode = dndAcquireBnode(pDnode); - if (pBnode == NULL) { - dError("failed to drop bnode since %s", terrstr()); - return -1; - } - - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 1; - taosRUnLockLatch(&pMgmt->latch); - - if (dndWriteBnodeFile(pDnode) != 0) { - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 0; - taosRUnLockLatch(&pMgmt->latch); - - dndReleaseBnode(pDnode, pBnode); - dError("failed to drop bnode since %s", terrstr()); - return -1; - } - - dndReleaseBnode(pDnode, pBnode); - dndStopBnodeWorker(pDnode); - pMgmt->deployed = 0; - dndWriteBnodeFile(pDnode); - bndClose(pBnode); - pMgmt->pBnode = NULL; - bndDestroy(pDnode->dir.bnode); - - return 0; -} - -int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { - SDCreateBnodeReq createReq = {0}; - if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (createReq.dnodeId != pDnode->dnodeId) { - terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; - dError("failed to create bnode since %s", terrstr()); - return -1; - } else { - return dndOpenBnode(pDnode); - } -} - -int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { - SDDropBnodeReq dropReq = {0}; - if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (dropReq.dnodeId != pDnode->dnodeId) { - terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; - dError("failed to drop bnode since %s", terrstr()); - return -1; - } else { - return dndDropBnode(pDnode); - } -} - static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; rpcSendResponse(&rpcRsp); @@ -308,7 +62,7 @@ static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t co } static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { - SBnode *pBnode = dndAcquireBnode(pDnode); + SBnode *pBnode = bmAcquire(pDnode); if (pBnode == NULL) { dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); return; @@ -316,7 +70,7 @@ static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfM SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); if (pArray == NULL) { - dndReleaseBnode(pDnode, pBnode); + bmRelease(pDnode, pBnode); dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); return; } @@ -338,17 +92,17 @@ static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfM taosFreeQitem(pMsg); } taosArrayDestroy(pArray); - dndReleaseBnode(pDnode, pBnode); + bmRelease(pDnode, pBnode); } static void dndWriteBnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { int32_t code = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; - SBnode *pBnode = dndAcquireBnode(pDnode); + SBnode *pBnode = bmAcquire(pDnode); if (pBnode != NULL) { code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); } - dndReleaseBnode(pDnode, pBnode); + bmRelease(pDnode, pBnode); if (code != 0) { if (pMsg->msgType & 1u) { @@ -379,13 +133,13 @@ int32_t dndInitBnode(SDnode *pDnode) { if (!pMgmt->deployed) return 0; - return dndOpenBnode(pDnode); + return bmOpen(pDnode); } void dndCleanupBnode(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; if (pMgmt->pBnode) { - dndStopBnodeWorker(pDnode); + bmStopWorker(pDnode); bndClose(pMgmt->pBnode); pMgmt->pBnode = NULL; } diff --git a/source/dnode/mgmt/bnode/src/bmMsg.c b/source/dnode/mgmt/bnode/src/bmMsg.c index d81e80da74..4a804e6a2c 100644 --- a/source/dnode/mgmt/bnode/src/bmMsg.c +++ b/source/dnode/mgmt/bnode/src/bmMsg.c @@ -16,9 +16,42 @@ #define _DEFAULT_SOURCE #include "bmInt.h" -int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;} -int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;} +int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnode *pDnode = pMgmt->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; + SDCreateBnodeReq createReq = {0}; + if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } -void bmInitMsgHandles(SMgmtWrapper *pWrapper) { + if (createReq.dnodeId != pDnode->dnodeId) { + terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; + dError("failed to create bnode since %s", terrstr()); + return -1; + } else { + return bmOpen(pDnode); + } } + +int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnode *pDnode = pMgmt->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; + + SDDropBnodeReq dropReq = {0}; + if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (dropReq.dnodeId != pDnode->dnodeId) { + terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; + dError("failed to drop bnode since %s", terrstr()); + return -1; + } else { + return bmDrop(pDnode); + } +} + +void bmInitMsgHandles(SMgmtWrapper *pWrapper) {} diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/dnode/mgmt/mnode/src/mmFile.c b/source/dnode/mgmt/mnode/src/mmFile.c index 734654d9d4..e6266e0961 100644 --- a/source/dnode/mgmt/mnode/src/mmFile.c +++ b/source/dnode/mgmt/mnode/src/mmFile.c @@ -150,7 +150,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt) { taosCloseFile(&pFile); free(content); - char realfile[PATH_MAX + 20]; + char realfile[PATH_MAX]; snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); if (taosRenameFile(file, realfile) != 0) { diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index 4305e913b2..f1698f71b1 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -45,9 +45,17 @@ void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode) { } int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { + SMnode *pMnode = mmAcquire(pMgmt); + if (pMnode != NULL) { + mmRelease(pMgmt, pMnode); + terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; + dError("failed to create mnode since %s", terrstr()); + return -1; + } + if (walInit() != 0) { dError("failed to init wal since %s", terrstr()); - dndCleanup(); + mndDestroy(pMgmt->path); return -1; } -- GitLab