提交 f31a7b46 编写于 作者: S Shengliang Guan

shm

上级 565774b9
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_BNODE_H_
#define _TD_DND_BNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern "C" {
#endif
void bmGetMgmtFp(SMgmtWrapper *pWrapper);
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_BNODE_H_*/
\ No newline at end of file
......@@ -16,48 +16,62 @@
#ifndef _TD_DND_BNODE_INT_H_
#define _TD_DND_BNODE_INT_H_
#include "dnd.h"
#include "mm.h"
#include "dm.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SBnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
SProcObj *pProcess;
bool singleProc;
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SRWLatch latch;
SDnodeWorker writeWorker;
} SBnodeMgmt;
void bmGetMgmtFp(SMgmtWrapper *pMgmt);
// mmFile.c
int32_t bmReadFile(SBnodeMgmt *pMgmt);
int32_t bmWriteFile(SBnodeMgmt *pMgmt);
SBnode *bmAcquire(SBnodeMgmt *pMgmt);
void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode);
// SBnode *mmAcquire(SMnodeMgmt *pMgmt);
// void mmRelease(SMnodeMgmt *pMgmt, SBnode *pMnode);
// int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption);
// int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption);
// int32_t mmDrop(SMnodeMgmt *pMgmt);
int32_t dndInitBnode(SDnode *pDnode);
void dndCleanupBnode(SDnode *pDnode);
// void bmGetMgmtFp(SMgmtWrapper *pMgmt);
void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
// int32_t dndInitBnode(SDnode *pDnode);
// void dndCleanupBnode(SDnode *pDnode);
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
// void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
// int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
// int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
int32_t bmStartWorker(SDnode *pDnode);
void bmStopWorker(SDnode *pDnode);
void bmInitMsgFp(SMnodeMgmt *pMgmt);
void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
// void bmInitMsgHandles(SMgmtWrapper *pWrapper);
void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// int32_t bmStartWorker(SDnode *pDnode);
// void bmStopWorker(SDnode *pDnode);
// void bmInitMsgFp(SMnodeMgmt *pMgmt);
// void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
// int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
// int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
// void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
// void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
// void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "bmInt.h"
int32_t bmReadFile(SBnodeMgmt *pMgmt) {
int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 1024;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
char file[PATH_MAX];
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%sbnode.json", pMgmt->path, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_BNODE_OVER;
}
len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) {
dError("failed to read %s since content is null", file);
goto PRASE_BNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", file);
goto PRASE_BNODE_OVER;
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", file);
goto PRASE_BNODE_OVER;
}
pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_BNODE_OVER;
}
pMgmt->dropped = dropped->valueint;
code = 0;
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
PRASE_BNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile);
terrno = code;
return code;
}
int32_t bmWriteFile(SBnodeMgmt *pMgmt) {
char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%sbnode.json", pMgmt->path, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
dError("failed to write %s since %s", file, terrstr());
return -1;
}
int32_t len = 0;
int32_t maxLen = 1024;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len);
taosFsyncFile(pFile);
taosCloseFile(&pFile);
free(content);
char realfile[PATH_MAX];
snprintf(realfile, sizeof(realfile), "%s%sbnode.json", pMgmt->path);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", file, terrstr());
return -1;
}
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0;
}
......@@ -16,15 +16,202 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
bool bmRequireNode(SMgmtWrapper *pWrapper) { return false; }
SBnode *bmAcquire(SBnodeMgmt *pMgmt) {
SBnode *pBnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pBnode != NULL) {
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pBnode = pMgmt->pBnode;
} else {
terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pBnode != NULL) {
dTrace("acquire bnode, refCount:%d", refCount);
}
return pBnode;
}
void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode) {
if (pBnode == NULL) return;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
dTrace("release bnode, refCount:%d", refCount);
}
static bool bmRequire(SMgmtWrapper *pWrapper) {
SBnodeMgmt mgmt = {0};
mgmt.path = pWrapper->path;
if (mmReadFile(&mgmt) != 0) {
return false;
}
if (mgmt.dropped) {
dInfo("bnode has been dropped and needs to be deleted");
mndDestroy(mgmt.path);
return false;
}
if (mgmt.deployed) {
dInfo("bnode has been deployed");
return true;
}
bool required = mmDeployRequired(pWrapper->pDnode);
if (required) {
dInfo("bnode need to be deployed");
}
return required;
}
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
}
int32_t bmOpen(SBnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
SBnode *pBnode = bmAcquire(pDnode);
if (pBnode != NULL) {
bmRelease(pDnode, pBnode);
terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
dError("failed to create bnode since %s", terrstr());
return -1;
}
pBnode = bndOpen(pMgmt->path, pOption);
if (pBnode == NULL) {
dError("failed to open bnode since %s", terrstr());
return -1;
}
if (bmStartWorker(pDnode) != 0) {
dError("failed to start bnode worker since %s", terrstr());
bndClose(pBnode);
bndDestroy(pMgmt->path);
return -1;
}
pMgmt->deployed = 1;
if (bmWriteFile(pDnode) != 0) {
dError("failed to write bnode file since %s", terrstr());
pMgmt->deployed = 0;
bmStopWorker(pDnode);
bndClose(pBnode);
bndDestroy(pMgmt->path);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pBnode = pBnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch);
dInfo("bnode open successfully");
return 0;
}
int32_t bmDrop(SBnodeMgmt *pMgmt) {
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode == NULL) {
dError("failed to drop bnode since %s", terrstr());
return -1;
}
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 1;
taosRUnLockLatch(&pMgmt->latch);
if (bmWriteFile(pMgmt) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
bmRelease(pMgmt, pBnode);
dError("failed to drop bnode since %s", terrstr());
return -1;
}
bmRelease(pMgmt, pBnode);
bmStopWorker(pMgmt);
pMgmt->deployed = 0;
bmWriteFile(pMgmt);
bndClose(pBnode);
pMgmt->pBnode = NULL;
bndDestroy(pMgmt->path);
return 0;
}
static int32_t bmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SBnodeMgmt *pMgmt = calloc(1, sizeof(SBnodeMgmt));
int32_t code = -1;
SBnodeOpt option = {0};
dInfo("bnode-mgmt start to init");
if (pMgmt == NULL) goto _OVER;
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pWrapper->pDnode;
pMgmt->pWrapper = pWrapper;
taosInitRWLatch(&pMgmt->latch);
if (bmReadFile(pMgmt) != 0) {
dError("failed to read file since %s", terrstr());
goto _OVER;
}
dInfo("bnode start to open");
bmInitOption(pDnode, &option);
code = bmOpen(pMgmt, &option);
_OVER:
if (code == 0) {
pWrapper->pMgmt = pMgmt;
dInfo("bnode-mgmt is initialized");
} else {
dError("failed to init bnode-mgmt since %s", terrstr());
bmCleanup(pWrapper);
}
return code;
}
static void bmCleanup(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("bnode-mgmt start to cleanup");
if (pMgmt->pBnode) {
bmStopWorker(pMgmt);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("bnode-mgmt is cleaned up");
}
void bmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = bmRequireNode;
mgmtFp.openFp = bmInit;
mgmtFp.closeFp = bmCleanup;
mgmtFp.requiredFp = bmRequire;
bmInitMsgHandles(pWrapper);
pWrapper->name = "snode";
pWrapper->name = "bnode";
pWrapper->fp = mgmtFp;
}
......@@ -21,136 +21,8 @@
#if 0
static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
static SBnode *dndAcquireBnode(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
SBnode *pBnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pBnode != NULL) {
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pBnode = pMgmt->pBnode;
} else {
terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pBnode != NULL) {
dTrace("acquire bnode, refCount:%d", refCount);
}
return pBnode;
}
static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) {
if (pBnode == NULL) return;
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
dTrace("release bnode, refCount:%d", refCount);
}
static int32_t dndReadBnodeFile(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 1024;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode);
// FILE *fp = fopen(file, "r");
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_BNODE_OVER;
}
len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) {
dError("failed to read %s since content is null", file);
goto PRASE_BNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", file);
goto PRASE_BNODE_OVER;
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", file);
goto PRASE_BNODE_OVER;
}
pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_BNODE_OVER;
}
pMgmt->dropped = dropped->valueint;
code = 0;
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
PRASE_BNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile);
terrno = code;
return code;
}
static int32_t dndWriteBnodeFile(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode);
// FILE *fp = fopen(file, "w");
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
dError("failed to write %s since %s", file, terrstr());
return -1;
}
int32_t len = 0;
int32_t maxLen = 1024;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len);
taosFsyncFile(pFile);
taosCloseFile(&pFile);
free(content);
char realfile[PATH_MAX + 20];
snprintf(realfile, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", file, terrstr());
return -1;
}
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0;
}
static int32_t dndStartBnodeWorker(SDnode *pDnode) {
static int32_t bmStartWorker(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) {
dError("failed to start bnode write worker since %s", terrstr());
......@@ -160,7 +32,7 @@ static int32_t dndStartBnodeWorker(SDnode *pDnode) {
return 0;
}
static void dndStopBnodeWorker(SDnode *pDnode) {
static void bmStopWorker(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
taosWLockLatch(&pMgmt->latch);
......@@ -174,124 +46,6 @@ static void dndStopBnodeWorker(SDnode *pDnode) {
dndCleanupWorker(&pMgmt->writeWorker);
}
static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
pOption->sver = tsVersion;
}
static int32_t dndOpenBnode(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
SBnode *pBnode = dndAcquireBnode(pDnode);
if (pBnode != NULL) {
dndReleaseBnode(pDnode, pBnode);
terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
dError("failed to create bnode since %s", terrstr());
return -1;
}
SBnodeOpt option = {0};
dndBuildBnodeOption(pDnode, &option);
pBnode = bndOpen(pDnode->dir.bnode, &option);
if (pBnode == NULL) {
dError("failed to open bnode since %s", terrstr());
return -1;
}
if (dndStartBnodeWorker(pDnode) != 0) {
dError("failed to start bnode worker since %s", terrstr());
bndClose(pBnode);
return -1;
}
pMgmt->deployed = 1;
if (dndWriteBnodeFile(pDnode) != 0) {
pMgmt->deployed = 0;
dError("failed to write bnode file since %s", terrstr());
dndStopBnodeWorker(pDnode);
bndClose(pBnode);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pBnode = pBnode;
taosWUnLockLatch(&pMgmt->latch);
dInfo("bnode open successfully");
return 0;
}
static int32_t dndDropBnode(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
SBnode *pBnode = dndAcquireBnode(pDnode);
if (pBnode == NULL) {
dError("failed to drop bnode since %s", terrstr());
return -1;
}
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 1;
taosRUnLockLatch(&pMgmt->latch);
if (dndWriteBnodeFile(pDnode) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
dndReleaseBnode(pDnode, pBnode);
dError("failed to drop bnode since %s", terrstr());
return -1;
}
dndReleaseBnode(pDnode, pBnode);
dndStopBnodeWorker(pDnode);
pMgmt->deployed = 0;
dndWriteBnodeFile(pDnode);
bndClose(pBnode);
pMgmt->pBnode = NULL;
bndDestroy(pDnode->dir.bnode);
return 0;
}
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateBnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to create bnode since %s", terrstr());
return -1;
} else {
return dndOpenBnode(pDnode);
}
}
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropBnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
} else {
return dndDropBnode(pDnode);
}
}
static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
......@@ -308,7 +62,7 @@ static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t co
}
static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
SBnode *pBnode = dndAcquireBnode(pDnode);
SBnode *pBnode = bmAcquire(pDnode);
if (pBnode == NULL) {
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return;
......@@ -316,7 +70,7 @@ static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfM
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) {
dndReleaseBnode(pDnode, pBnode);
bmRelease(pDnode, pBnode);
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return;
}
......@@ -338,17 +92,17 @@ static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfM
taosFreeQitem(pMsg);
}
taosArrayDestroy(pArray);
dndReleaseBnode(pDnode, pBnode);
bmRelease(pDnode, pBnode);
}
static void dndWriteBnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
SBnode *pBnode = dndAcquireBnode(pDnode);
SBnode *pBnode = bmAcquire(pDnode);
if (pBnode != NULL) {
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseBnode(pDnode, pBnode);
bmRelease(pDnode, pBnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
......@@ -379,13 +133,13 @@ int32_t dndInitBnode(SDnode *pDnode) {
if (!pMgmt->deployed) return 0;
return dndOpenBnode(pDnode);
return bmOpen(pDnode);
}
void dndCleanupBnode(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
if (pMgmt->pBnode) {
dndStopBnodeWorker(pDnode);
bmStopWorker(pDnode);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
......
......@@ -16,9 +16,42 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;}
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;}
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateBnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void bmInitMsgHandles(SMgmtWrapper *pWrapper) {
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to create bnode since %s", terrstr());
return -1;
} else {
return bmOpen(pDnode);
}
}
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropBnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
} else {
return bmDrop(pDnode);
}
}
void bmInitMsgHandles(SMgmtWrapper *pWrapper) {}
......@@ -150,7 +150,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt) {
taosCloseFile(&pFile);
free(content);
char realfile[PATH_MAX + 20];
char realfile[PATH_MAX];
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) {
......
......@@ -45,9 +45,17 @@ void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode) {
}
int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode != NULL) {
mmRelease(pMgmt, pMnode);
terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
dError("failed to create mnode since %s", terrstr());
return -1;
}
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
dndCleanup();
mndDestroy(pMgmt->path);
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册