提交 4131fbea 编写于 作者: S Shengliang Guan

shm

上级 846129fd
...@@ -2327,7 +2327,7 @@ struct SMgmtWrapper; ...@@ -2327,7 +2327,7 @@ struct SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg); typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
typedef int32_t (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -86,10 +86,6 @@ int32_t* taosGetErrno(); ...@@ -86,10 +86,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121) #define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121)
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122) #define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122)
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123) #define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123)
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0124)
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0125)
#define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0126)
#define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0127)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
...@@ -282,19 +278,14 @@ int32_t* taosGetErrno(); ...@@ -282,19 +278,14 @@ int32_t* taosGetErrno();
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401) #define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402) #define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402)
#define TSDB_CODE_DND_DNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0410) #define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403)
#define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411) #define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404)
#define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0420) #define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0405)
#define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0421) #define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0406)
#define TSDB_CODE_DND_MNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0422) #define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0410)
#define TSDB_CODE_DND_MNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0423) #define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411)
#define TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0424) #define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0412)
#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0460) #define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0413)
#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0461)
#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0462)
#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0463)
#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0464)
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0465)
// vnode // vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500)
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_DND_BNODE_INT_H_ #define _TD_DND_BNODE_INT_H_
#include "bm.h" #include "bm.h"
#include "bnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -23,6 +23,7 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { ...@@ -23,6 +23,7 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
pOption->pWrapper = pMgmt->pWrapper; pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
} }
......
...@@ -36,13 +36,8 @@ ...@@ -36,13 +36,8 @@
#include "tworker.h" #include "tworker.h"
#include "dnode.h" #include "dnode.h"
#include "bnode.h"
#include "mnode.h"
#include "qnode.h"
#include "snode.h"
#include "tfs.h" #include "tfs.h"
#include "vnode.h" #include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -293,7 +293,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { ...@@ -293,7 +293,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
} }
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { if (pRsp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp); dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
} else { } else {
......
...@@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep); ...@@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep);
static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps); static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps);
int32_t dmReadFile(SDnodeMgmt *pMgmt) { int32_t dmReadFile(SDnodeMgmt *pMgmt) {
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 256 * 1024; int32_t maxLen = 256 * 1024;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
...@@ -203,7 +203,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) { ...@@ -203,7 +203,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) { if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; return -1;
} }
......
...@@ -23,10 +23,6 @@ extern "C" { ...@@ -23,10 +23,6 @@ extern "C" {
#endif #endif
void mmGetMgmtFp(SMgmtWrapper *pMgmt); void mmGetMgmtFp(SMgmtWrapper *pMgmt);
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
......
...@@ -16,43 +16,39 @@ ...@@ -16,43 +16,39 @@
#ifndef _TD_DND_MNODE_INT_H_ #ifndef _TD_DND_MNODE_INT_H_
#define _TD_DND_MNODE_INT_H_ #define _TD_DND_MNODE_INT_H_
#include "dm.h"
#include "mm.h" #include "mm.h"
#include "mnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct SMnodeMgmt { typedef struct SMnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SMnode *pMnode; SMnode *pMnode;
SDnode *pDnode; SDnode *pDnode;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
const char *path; const char *path;
SRWLatch latch;
SDnodeWorker readWorker; SDnodeWorker readWorker;
SDnodeWorker writeWorker; SDnodeWorker writeWorker;
SDnodeWorker syncWorker; SDnodeWorker syncWorker;
SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica;
int8_t selfIndex;
} SMnodeMgmt; } SMnodeMgmt;
// mmFile.c // mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt); int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt); int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed);
// mmInt.c // mmInt.c
SMnode *mmAcquire(SMnodeMgmt *pMgmt); int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq);
void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode); int32_t mmDrop(SMgmtWrapper *pWrapper);
int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption); int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption);
int32_t mmDrop(SMnodeMgmt *pMgmt);
int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate);
// mmMsg.c // mmMsg.c
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
// mmWorker.c // mmWorker.c
...@@ -61,7 +57,6 @@ void mmStopWorker(SMnodeMgmt *pMgmt); ...@@ -61,7 +57,6 @@ void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg);
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
int32_t mmReadFile(SMnodeMgmt *pMgmt) { int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 4096; int32_t maxLen = 4096;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
...@@ -51,14 +51,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt) { ...@@ -51,14 +51,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt) {
dError("failed to read %s since deployed not found", file); dError("failed to read %s since deployed not found", file);
goto PRASE_MNODE_OVER; goto PRASE_MNODE_OVER;
} }
pMgmt->deployed = deployed->valueint; *pDeployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_MNODE_OVER;
}
pMgmt->dropped = dropped->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (!mnodes || mnodes->type != cJSON_Array) { if (!mnodes || mnodes->type != cJSON_Array) {
...@@ -101,7 +94,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt) { ...@@ -101,7 +94,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt) {
} }
code = 0; code = 0;
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
PRASE_MNODE_OVER: PRASE_MNODE_OVER:
if (content != NULL) free(content); if (content != NULL) free(content);
...@@ -112,13 +105,13 @@ PRASE_MNODE_OVER: ...@@ -112,13 +105,13 @@ PRASE_MNODE_OVER:
return code; return code;
} }
int32_t mmWriteFile(SMnodeMgmt *pMgmt) { int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
char file[PATH_MAX]; char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; terrno = TAOS_SYSTEM_ERROR(errno);;
dError("failed to write %s since %s", file, terrstr()); dError("failed to write %s since %s", file, terrstr());
return -1; return -1;
} }
...@@ -128,9 +121,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt) { ...@@ -128,9 +121,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt) {
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < pMgmt->replica; ++i) { for (int32_t i = 0; i < pMgmt->replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i]; SReplica *pReplica = &pMgmt->replicas[i];
...@@ -154,11 +145,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt) { ...@@ -154,11 +145,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt) {
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) { if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; terrno = TAOS_SYSTEM_ERROR(errno);;
dError("failed to rename %s since %s", file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; return -1;
} }
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); dInfo("successed to write %s, deployed:%d", realfile, deployed);
return 0; return 0;
} }
...@@ -16,138 +16,35 @@ ...@@ -16,138 +16,35 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
SMnode *mmAcquire(SMnodeMgmt *pMgmt) { static bool mmDeployRequired(SDnode *pDnode) {
SMnode *pMnode = NULL; if (pDnode->dnodeId > 0) return false;
int32_t refCount = 0; if (pDnode->clusterId > 0) return false;
if (strcmp(pDnode->localEp, pDnode->firstEp) != 0) return false;
taosRLockLatch(&pMgmt->latch); return true;
if (pMgmt->deployed && !pMgmt->dropped) {
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pMnode = pMgmt->pMnode;
} else {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pMnode != NULL) {
dTrace("acquire mnode, refCount:%d", refCount);
}
return pMnode;
}
void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode) {
if (pMnode == NULL) return;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
dTrace("release mnode, refCount:%d", refCount);
}
int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode != NULL) {
mmRelease(pMgmt, pMnode);
terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
dError("failed to create mnode since %s", terrstr());
return -1;
}
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
mndDestroy(pMgmt->path);
return -1;
}
pMnode = mndOpen(pMgmt->path, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
return -1;
}
if (mmStartWorker(pMgmt) != 0) {
dError("failed to start mnode worker since %s", terrstr());
mndClose(pMnode);
mndDestroy(pMgmt->path);
return -1;
}
pMgmt->deployed = 1;
if (mmWriteFile(pMgmt) != 0) {
dError("failed to write mnode file since %s", terrstr());
pMgmt->deployed = 0;
mmStopWorker(pMgmt);
mndClose(pMnode);
mndDestroy(pMgmt->path);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pMnode = pMnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch);
dInfo("mnode open successfully");
return 0;
}
int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) {
dError("failed to alter mnode since %s", terrstr());
return -1;
}
if (mndAlter(pMnode, pOption) != 0) {
dError("failed to alter mnode since %s", terrstr());
mmRelease(pMgmt, pMnode);
return -1;
}
mmRelease(pMgmt, pMnode);
return 0;
} }
int32_t mmDrop(SMnodeMgmt *pMgmt) { static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
SMnode *pMnode = mmAcquire(pMgmt); SMnodeMgmt mgmt = {0};
if (pMnode == NULL) { mgmt.path = pWrapper->path;
dError("failed to drop mnode since %s", terrstr()); if (mmReadFile(&mgmt, required) != 0) {
return -1; return -1;
} }
taosRLockLatch(&pMgmt->latch); if (!(*required)) {
pMgmt->dropped = 1; *required = mmDeployRequired(pWrapper->pDnode);
taosRUnLockLatch(&pMgmt->latch);
if (mmWriteFile(pMgmt) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
mmRelease(pMgmt, pMnode);
dError("failed to drop mnode since %s", terrstr());
return -1;
} }
mmRelease(pMgmt, pMnode);
mmStopWorker(pMgmt);
pMgmt->deployed = 0;
mmWriteFile(pMgmt);
mndClose(pMnode);
pMgmt->pMnode = NULL;
mndDestroy(pMgmt->path);
return 0; return 0;
} }
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper; pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->putToWriteQFp = mmPutMsgToWriteQueue; pOption->putToWriteQFp = mmPutMsgToWriteQueue;
pOption->putToReadQFp = mmPutMsgToReadQueue; pOption->putToReadQFp = mmPutMsgToReadQueue;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
} }
...@@ -175,12 +72,8 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -175,12 +72,8 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
} }
int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
SDnode *pDnode = pMgmt->pDnode;
mmInitOption(pMgmt, pOption); mmInitOption(pMgmt, pOption);
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
pOption->replica = pCreate->replica; pOption->replica = pCreate->replica;
pOption->selfIndex = -1; pOption->selfIndex = -1;
...@@ -205,109 +98,136 @@ int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnod ...@@ -205,109 +98,136 @@ int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnod
return 0; return 0;
} }
static void mmCleanup(SMgmtWrapper *pWrapper) { static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("mnode-mgmt start to cleanup");
if (pMgmt->pMnode) {
mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode);
pMgmt->pMnode = NULL;
}
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("mnode-mgmt is cleaned up");
}
static int32_t mmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SMnodeMgmt *pMgmt = calloc(1, sizeof(SMnodeMgmt));
int32_t code = -1;
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (pReq != NULL) {
dInfo("mnode-mgmt start to init"); if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
if (pMgmt == NULL) goto _OVER; return -1;
}
pMgmt->path = pWrapper->path; } else {
pMgmt->pDnode = pWrapper->pDnode; bool deployed = false;
pMgmt->pWrapper = pWrapper; if (mmReadFile(pMgmt, &deployed) != 0) {
taosInitRWLatch(&pMgmt->latch);
if (mmReadFile(pMgmt) != 0) {
dError("failed to read file since %s", terrstr()); dError("failed to read file since %s", terrstr());
goto _OVER; return -1;
} }
if (!pMgmt->deployed) { if (!deployed) {
dInfo("mnode start to deploy"); dInfo("mnode start to deploy");
mmBuildOptionForDeploy(pMgmt, &option); mmBuildOptionForDeploy(pMgmt, &option);
code = mmOpen(pMgmt, &option);
} else { } else {
dInfo("mnode start to open"); dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option); mmBuildOptionForOpen(pMgmt, &option);
code = mmOpen(pMgmt, &option); }
} }
_OVER: pMgmt->pMnode = mndOpen(pMgmt->path, &option);
if (code == 0) { if (pMgmt->pMnode == NULL) {
pWrapper->pMgmt = pMgmt; dError("failed to open mnode since %s", terrstr());
dInfo("mnode-mgmt is initialized"); return -1;
} else {
dError("failed to init mnode-mgmt since %s", terrstr());
mmCleanup(pWrapper);
} }
return code; if (mmStartWorker(pMgmt) != 0) {
} dError("failed to start mnode worker since %s", terrstr());
return -1;
}
static bool mmDeployRequired(SDnode *pDnode) { bool deployed = true;
if (pDnode->dnodeId > 0) { if (dndWriteFile(pMgmt->pWrapper, deployed) != 0) {
return false; dError("failed to write mnode file since %s", terrstr());
return -1;
} }
if (pDnode->clusterId > 0) { return 0;
return false; }
static void mmCloseImp(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode);
pMgmt->pMnode = NULL;
} }
}
if (strcmp(pDnode->localEp, pDnode->firstEp) != 0) { int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
return false; SMnodeOpt option = {0};
if (pReq != NULL) {
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
return -1;
}
} }
return true; return mndAlter(pMgmt->pMnode, &option);
} }
static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { int32_t mmDrop(SMgmtWrapper *pWrapper) {
SMnodeMgmt mgmt = {0}; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mgmt.path = pWrapper->path; if (pMgmt == NULL) return 0;
if (mmReadFile(&mgmt) != 0) {
dInfo("mnode-mgmt start to drop");
bool deployed = false;
if (mmWriteFile(pMgmt, deployed) != 0) {
dError("failed to drop mnode since %s", terrstr());
return -1; return -1;
} }
if (mgmt.dropped) { mmCloseImp(pMgmt);
dInfo("mnode has been dropped and needs to be deleted"); taosRemoveDir(pMgmt->path);
mndDestroy(mgmt.path); pWrapper->pMgmt = NULL;
free(pMgmt);
dInfo("mnode-mgmt is dropped");
return 0;
}
static void mmClose(SMgmtWrapper *pWrapper) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("mnode-mgmt start to cleanup");
mmCloseImp(pMgmt);
pWrapper->pMgmt = NULL;
free(pMgmt);
dInfo("mnode-mgmt is cleaned up");
}
int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) {
dInfo("mnode-mgmt start to init");
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
return -1; return -1;
} }
if (mgmt.deployed) { SMnodeMgmt *pMgmt = calloc(1, sizeof(SMnodeMgmt));
*required = true; if (pMgmt == NULL) {
dInfo("mnode has been deployed"); terrno = TSDB_CODE_OUT_OF_MEMORY;
return 0; return -1;
} }
*required = mmDeployRequired(pWrapper->pDnode); pMgmt->path = pWrapper->path;
if (*required) { pMgmt->pDnode = pWrapper->pDnode;
dInfo("mnode need to be deployed"); pMgmt->pWrapper = pWrapper;
pWrapper->pMgmt = pMgmt;
int32_t code = mmOpenImp(pMgmt, pReq);
if (code != 0) {
dError("failed to init mnode-mgmt since %s", terrstr());
mmClose(pWrapper);
} else {
dInfo("mnode-mgmt is initialized");
} }
return 0; return code;
}
static int32_t mmOpen(SMgmtWrapper *pWrapper) {
return mmOpenFromMsg(pWrapper, NULL);
} }
void mmGetMgmtFp(SMgmtWrapper *pWrapper) { void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0}; SMgmtFp mgmtFp = {0};
mgmtFp.openFp = mmInit; mgmtFp.openFp = mmOpen;
mgmtFp.closeFp = mmCleanup; mgmtFp.closeFp = mmClose;
mgmtFp.createMsgFp = mmProcessCreateReq;
mgmtFp.dropMsgFp = mmProcessDropReq;
mgmtFp.requiredFp = mmRequire; mgmtFp.requiredFp = mmRequire;
mmInitMsgHandles(pWrapper); mmInitMsgHandles(pWrapper);
...@@ -318,16 +238,7 @@ void mmGetMgmtFp(SMgmtWrapper *pWrapper) { ...@@ -318,16 +238,7 @@ void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SMnode *pMnode = mmAcquire(pMgmt); int32_t code = mndRetriveAuth(pMgmt->pMnode, user, spi, encrypt, secret, ckey);
if (pMnode == NULL) {
terrno = TSDB_CODE_APP_NOT_READY;
dTrace("failed to get user auth since %s", terrstr());
return -1;
}
int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
mmRelease(pMgmt, pMnode);
dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt);
return code; return code;
} }
...@@ -335,10 +246,5 @@ int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encry ...@@ -335,10 +246,5 @@ int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encry
int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SMnode *pMnode = mmAcquire(pMgmt); return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
if (pMnode == NULL) return -1;
int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
mmRelease(pMgmt, pMnode);
return code;
} }
\ No newline at end of file
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateMnodeReq createReq = {0}; SDCreateMnodeReq createReq = {0};
...@@ -27,28 +27,31 @@ int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -27,28 +27,31 @@ int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
} }
if (createReq.replica <= 1 || createReq.dnodeId != pDnode->dnodeId) { if (createReq.replica <= 1 || createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_NODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
} else {
return mmOpenFromMsg(pWrapper, &createReq);
} }
}
SMnodeOpt option = {0}; int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
if (mmBuildOptionFromReq(pMgmt, &option, &createReq) != 0) { SDnode *pDnode = pWrapper->pDnode;
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; SRpcMsg *pReq = &pMsg->rpcMsg;
dError("failed to create mnode since %s", terrstr());
SDDropMnodeReq dropReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
SMnode *pMnode = mmAcquire(pMgmt); if (dropReq.dnodeId != pDnode->dnodeId) {
if (pMnode != NULL) { terrno = TSDB_CODE_NODE_INVALID_OPTION;
mmRelease(pMgmt, pMnode); dError("failed to drop mnode since %s", terrstr());
terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
dError("failed to create mnode since %s", terrstr());
return -1; return -1;
} else {
return mmDrop(pWrapper);
} }
dDebug("start to create mnode");
return mmOpen(pMgmt, &option);
} }
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
...@@ -62,60 +65,12 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -62,60 +65,12 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
} }
if (alterReq.dnodeId != pDnode->dnodeId) { if (alterReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_NODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr());
return -1;
}
SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, &alterReq) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr()); dError("failed to alter mnode since %s", terrstr());
return -1; return -1;
} else {
return mmAlter(pMgmt, &alterReq);
} }
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
dError("failed to alter mnode since %s", terrstr());
return -1;
}
dDebug("start to alter mnode");
int32_t code = mmAlter(pMgmt, &option);
mmRelease(pMgmt, pMnode);
return code;
}
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropMnodeReq dropReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr());
return -1;
}
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
dError("failed to drop mnode since %s", terrstr());
return -1;
}
dDebug("start to drop mnode");
int32_t code = mmDrop(pMgmt);
mmRelease(pMgmt, pMnode);
return code;
} }
void mmInitMsgHandles(SMgmtWrapper *pWrapper) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
......
...@@ -18,27 +18,22 @@ ...@@ -18,27 +18,22 @@
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in mnode queue", pMsg); dTrace("msg:%p, will be processed in mnode queue", pMsg);
SMnode *pMnode = mmAcquire(pMgmt);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
bool isReq = (pRpc->msgType & 1U);
int32_t code = -1; int32_t code = -1;
if (pMnode != NULL) { if (pMsg->rpcMsg.msgType != TDMT_DND_ALTER_MNODE) {
pMsg->pNode = pMnode; pMsg->pNode = pMgmt->pMnode;
code = mndProcessMsg(pMsg); code = mndProcessMsg(pMsg);
mmRelease(pMgmt, pMnode); } else {
code = mmProcessAlterReq(pMgmt, pMsg);
} }
if (isReq) { if (pRpc->msgType & 1U) {
if (pMsg->rpcMsg.handle == NULL) return; if (pRpc->handle == NULL) return;
if (code == 0) { if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
dndSendRsp(pMgmt->pWrapper, &rsp); dndSendRsp(pMgmt->pWrapper, &rsp);
} else {
if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = terrno};
dndSendRsp(pMgmt->pWrapper, &rsp);
}
} }
} }
...@@ -67,27 +62,14 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -67,27 +62,14 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
} }
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 1) {
taosMsleep(10);
}
dndCleanupWorker(&pMgmt->readWorker); dndCleanupWorker(&pMgmt->readWorker);
dndCleanupWorker(&pMgmt->writeWorker); dndCleanupWorker(&pMgmt->writeWorker);
dndCleanupWorker(&pMgmt->syncWorker); dndCleanupWorker(&pMgmt->syncWorker);
} }
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
SMnode *pMnode = mmAcquire(pMgmt);
if (pMnode == NULL) return -1;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
int32_t code = dndWriteMsgToWorker(pWorker, pMsg); return dndWriteMsgToWorker(pWorker, pMsg);
mmRelease(pMgmt, pMnode);
return code;
} }
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_DND_QNODE_INT_H_ #define _TD_DND_QNODE_INT_H_
#include "qm.h" #include "qm.h"
#include "qnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { ...@@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
pOption->pWrapper = pMgmt->pWrapper; pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_DND_SNODE_INT_H_ #define _TD_DND_SNODE_INT_H_
#include "sm.h" #include "sm.h"
#include "snode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -23,6 +23,7 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { ...@@ -23,6 +23,7 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
pOption->pWrapper = pMgmt->pWrapper; pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
} }
......
...@@ -40,7 +40,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ...@@ -40,7 +40,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_INVALID_OPTION);
} }
{ {
...@@ -57,7 +57,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ...@@ -57,7 +57,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_INVALID_OPTION);
} }
{ {
...@@ -77,7 +77,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ...@@ -77,7 +77,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
} }
} }
...@@ -96,7 +96,7 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) { ...@@ -96,7 +96,7 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_INVALID_OPTION);
} }
{ {
...@@ -113,7 +113,7 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) { ...@@ -113,7 +113,7 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_INVALID_OPTION);
} }
{ {
...@@ -145,7 +145,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -145,7 +145,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_INVALID_OPTION);
} }
{ {
...@@ -171,7 +171,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -171,7 +171,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
} }
{ {
...@@ -188,7 +188,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -188,7 +188,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
} }
{ {
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
#ifndef _TD_DND_VNODES_INT_H_ #ifndef _TD_DND_VNODES_INT_H_
#define _TD_DND_VNODES_INT_H_ #define _TD_DND_VNODES_INT_H_
#include "dm.h"
#include "sync.h" #include "sync.h"
#include "vm.h" #include "vm.h"
#include "vnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -45,7 +45,7 @@ SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { ...@@ -45,7 +45,7 @@ SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) {
} }
int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
......
...@@ -320,7 +320,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -320,7 +320,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_MNODE; action.msgType = TDMT_DND_ALTER_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pReq); free(pReq);
...@@ -345,7 +345,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -345,7 +345,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_MNODE; action.msgType = TDMT_DND_CREATE_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pReq); free(pReq);
return -1; return -1;
...@@ -490,7 +490,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -490,7 +490,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_MNODE; action.msgType = TDMT_DND_ALTER_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pReq); free(pReq);
...@@ -517,7 +517,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -517,7 +517,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_DROP_MNODE; action.msgType = TDMT_DND_DROP_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pReq); free(pReq);
return -1; return -1;
......
...@@ -69,10 +69,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") ...@@ -69,10 +69,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_ALREADY_DEPLOYED, "Node already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_NOT_DEPLOYED, "Node not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_PARSE_FILE_ERROR, "Invalid json format")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_INVALID_OPTION, "Invalid node option")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported") TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
...@@ -278,18 +274,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregatio ...@@ -278,18 +274,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregatio
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length") TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_READ_FILE_ERROR, "Read dnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_NODE_ALREADY_DEPLOYED, "Node already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR, "Write dnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_NODE_NOT_DEPLOYED, "Node not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED, "Mnode already deployed") TAOS_DEFINE_ERROR(TSDB_CODE_NODE_PARSE_FILE_ERROR, "Invalid json format")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_NOT_DEPLOYED, "Mnode not deployed") TAOS_DEFINE_ERROR(TSDB_CODE_NODE_INVALID_OPTION, "Invalid node option")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_INVALID_OPTION, "Mnode option invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_READ_FILE_ERROR, "Read mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR, "Write mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED, "Vnode already deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED, "Vnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_NOT_DEPLOYED, "Vnode not deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_NOT_DEPLOYED, "Vnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_INVALID_OPTION, "Vnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_INVALID_OPTION, "Vnode option invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_READ_FILE_ERROR, "Read vnodes.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR, "Write vnodes.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnodes") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnodes")
// vnode // vnode
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册