提交 1649d0dd 编写于 作者: S Shengliang Guan

shm

上级 13e6efe3
...@@ -29,8 +29,7 @@ extern "C" { ...@@ -29,8 +29,7 @@ extern "C" {
typedef struct SMnode SMnode; typedef struct SMnode SMnode;
typedef struct { typedef struct {
int32_t dnodeId; bool deploy;
int64_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
......
...@@ -111,7 +111,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { ...@@ -111,7 +111,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);; terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr()); dError("failed to write %s since %s", file, terrstr());
return -1; return -1;
} }
...@@ -145,7 +145,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { ...@@ -145,7 +145,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) { if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);; terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; return -1;
} }
......
...@@ -39,10 +39,6 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { ...@@ -39,10 +39,6 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
} }
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
...@@ -66,6 +62,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -66,6 +62,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pDnode->serverPort; pReplica->port = pDnode->serverPort;
tstrncpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN);
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica; pMgmt->replica = pOption->replica;
...@@ -77,6 +74,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -77,6 +74,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->selfIndex = pMgmt->selfIndex; pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica; pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pOption->deploy = false;
} }
static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
...@@ -89,7 +87,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre ...@@ -89,7 +87,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
pReplica->id = pCreate->replicas[i].id; pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port; pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) { if (pReplica->id == pMgmt->pDnode->dnodeId) {
pOption->selfIndex = i; pOption->selfIndex = i;
} }
} }
...@@ -98,6 +96,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre ...@@ -98,6 +96,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
dError("failed to build mnode options since %s", terrstr()); dError("failed to build mnode options since %s", terrstr());
return -1; return -1;
} }
pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica; pMgmt->replica = pOption->replica;
...@@ -225,9 +224,7 @@ int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) { ...@@ -225,9 +224,7 @@ int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) {
return code; return code;
} }
static int32_t mmOpen(SMgmtWrapper *pWrapper) { static int32_t mmOpen(SMgmtWrapper *pWrapper) { return mmOpenFromMsg(pWrapper, NULL); }
return mmOpenFromMsg(pWrapper, NULL);
}
static int32_t mmStart(SMgmtWrapper *pWrapper) { static int32_t mmStart(SMgmtWrapper *pWrapper) {
dDebug("mnode-mgmt start to run"); dDebug("mnode-mgmt start to run");
...@@ -258,7 +255,7 @@ int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encry ...@@ -258,7 +255,7 @@ int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encry
} }
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
} }
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
int32_t mndInitCluster(SMnode *pMnode); int32_t mndInitCluster(SMnode *pMnode);
void mndCleanupCluster(SMnode *pMnode); void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len); int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -100,7 +100,6 @@ typedef struct { ...@@ -100,7 +100,6 @@ typedef struct {
} SGrantInfo; } SGrantInfo;
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "mndCluster.h" #include "mndCluster.h"
#include "mndShow.h" #include "mndShow.h"
#define TSDB_CLUSTER_VER_NUMBE 1 #define TSDB_CLUSTER_VER_NUMBE 1
#define TSDB_CLUSTER_RESERVE_SIZE 64 #define TSDB_CLUSTER_RESERVE_SIZE 64
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
...@@ -61,6 +61,23 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) { ...@@ -61,6 +61,23 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
return 0; return 0;
} }
int64_t mndGetClusterId(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int64_t clusterId = -1;
while (1) {
SClusterObj *pCluster = NULL;
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
if (pIter == NULL) break;
clusterId = pCluster->id;
sdbRelease(pSdb, pCluster);
}
return clusterId;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -423,7 +423,7 @@ static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq) { ...@@ -423,7 +423,7 @@ static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq) {
DROP_QNODE_OVER: DROP_QNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }
mndReleaseQnode(pMnode, pObj); mndReleaseQnode(pMnode, pObj);
......
...@@ -433,7 +433,7 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) { ...@@ -433,7 +433,7 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) {
DROP_SNODE_OVER: DROP_SNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }
mndReleaseSnode(pMnode, pObj); mndReleaseSnode(pMnode, pObj);
......
...@@ -187,7 +187,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle ...@@ -187,7 +187,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
return 0; return 0;
} }
static int32_t mndInitSteps(SMnode *pMnode) { static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1; if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
...@@ -210,7 +210,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { ...@@ -210,7 +210,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (pMnode->clusterId <= 0) { if (deploy) {
if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
} else { } else {
if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
...@@ -263,23 +263,15 @@ static int32_t mndExecSteps(SMnode *pMnode) { ...@@ -263,23 +263,15 @@ static int32_t mndExecSteps(SMnode *pMnode) {
} }
} }
pMnode->clusterId = mndGetClusterId(pMnode);
return 0; return 0;
} }
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOption->clusterId;
pMnode->replica = pOption->replica; pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb; pMnode->msgCb = pOption->msgCb;
if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1;
}
return 0;
} }
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
...@@ -294,6 +286,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { ...@@ -294,6 +286,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
char timestr[24] = "1970-01-01 00:00:00.00"; char timestr[24] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
mndSetOptions(pMnode, pOption);
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep)); pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
if (pMnode->pSteps == NULL) { if (pMnode->pSteps == NULL) {
...@@ -312,16 +305,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { ...@@ -312,16 +305,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL; return NULL;
} }
code = mndSetOptions(pMnode, pOption); code = mndInitSteps(pMnode, pOption->deploy);
if (code != 0) {
code = terrno;
mError("failed to open mnode since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndInitSteps(pMnode);
if (code != 0) { if (code != 0) {
code = terrno; code = terrno;
mError("failed to open mnode since %s", terrstr()); mError("failed to open mnode since %s", terrstr());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册