提交 a5a4da4d 编写于 作者: S Shengliang Guan

refactor: multi process mode

上级 b08f6b5e
...@@ -52,9 +52,9 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { ...@@ -52,9 +52,9 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pInput->dnodeId); dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pInput->pData->dnodeId);
return -1; return -1;
} }
......
...@@ -34,7 +34,7 @@ static void bmClose(SBnodeMgmt *pMgmt) { ...@@ -34,7 +34,7 @@ static void bmClose(SBnodeMgmt *pMgmt) {
dInfo("bnode-mgmt is cleaned up"); dInfo("bnode-mgmt is cleaned up");
} }
int32_t bmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("bnode-mgmt start to init"); dInfo("bnode-mgmt start to init");
SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt)); SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
...@@ -44,7 +44,7 @@ int32_t bmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -44,7 +44,7 @@ int32_t bmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->dnodeId; pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.pMgmt = pMgmt;
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
typedef struct SDnodeMgmt { typedef struct SDnodeMgmt {
struct SDnode *pDnode; struct SDnode *pDnode;
SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
...@@ -33,14 +34,8 @@ typedef struct SDnodeMgmt { ...@@ -33,14 +34,8 @@ typedef struct SDnodeMgmt {
ProcessCreateNodeFp processCreateNodeFp; ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
IsNodeRequiredFp isNodeRequiredFp; IsNodeRequiredFp isNodeRequiredFp;
SDnodeData data;
} SDnodeMgmt; } SDnodeMgmt;
// dmEps.c
int32_t dmReadEps(SDnodeMgmt *pMgmt);
int32_t dmWriteEps(SDnodeMgmt *pMgmt);
void dmUpdateEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
// dmHandle.c // dmHandle.c
SArray *dmGetMsgHandles(); SArray *dmGetMsgHandles();
void dmSendStatusReq(SDnodeMgmt *pMgmt); void dmSendStatusReq(SDnodeMgmt *pMgmt);
......
...@@ -17,30 +17,30 @@ ...@@ -17,30 +17,30 @@
#include "dmInt.h" #include "dmInt.h"
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->data.dnodeId == 0 || pMgmt->data.clusterId == 0) { if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->data.latch); taosWLockLatch(&pMgmt->pData->latch);
pMgmt->data.dnodeId = pCfg->dnodeId; pMgmt->pData->dnodeId = pCfg->dnodeId;
pMgmt->data.clusterId = pCfg->clusterId; pMgmt->pData->clusterId = pCfg->clusterId;
dmWriteEps(pMgmt); dmWriteEps(pMgmt->pData);
taosWUnLockLatch(&pMgmt->data.latch); taosWUnLockLatch(&pMgmt->pData->latch);
} }
} }
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
if (pRsp->code != 0) { if (pRsp->code != 0) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->data.dropped && pMgmt->data.dnodeId > 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->data.dnodeId); dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
pMgmt->data.dropped = 1; pMgmt->pData->dropped = 1;
dmWriteEps(pMgmt); dmWriteEps(pMgmt->pData);
} }
} else { } else {
SStatusRsp statusRsp = {0}; SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen > 0 && if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->data.dnodeVer = statusRsp.dnodeVer; pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt, statusRsp.pDnodeEps); dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
} }
rpcFreeCont(pRsp->pCont); rpcFreeCont(pRsp->pCont);
tFreeSStatusRsp(&statusRsp); tFreeSStatusRsp(&statusRsp);
...@@ -50,17 +50,17 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { ...@@ -50,17 +50,17 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
void dmSendStatusReq(SDnodeMgmt *pMgmt) { void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SStatusReq req = {0}; SStatusReq req = {0};
taosRLockLatch(&pMgmt->data.latch); taosRLockLatch(&pMgmt->pData->latch);
req.sver = tsVersion; req.sver = tsVersion;
req.dnodeVer = pMgmt->data.dnodeVer; req.dnodeVer = pMgmt->pData->dnodeVer;
req.dnodeId = pMgmt->data.dnodeId; req.dnodeId = pMgmt->pData->dnodeId;
req.clusterId = pMgmt->data.clusterId; req.clusterId = pMgmt->pData->clusterId;
if (req.clusterId == 0) req.dnodeId = 0; if (req.clusterId == 0) req.dnodeId = 0;
req.rebootTime = pMgmt->data.rebootTime; req.rebootTime = pMgmt->pData->rebootTime;
req.updateTime = pMgmt->data.updateTime; req.updateTime = pMgmt->pData->updateTime;
req.numOfCores = tsNumOfCores; req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = pMgmt->data.supportVnodes; req.numOfSupportVnodes = pMgmt->pData->supportVnodes;
tstrncpy(req.dnodeEp, pMgmt->data.localEp, TSDB_EP_LEN); tstrncpy(req.dnodeEp, pMgmt->pData->localEp, TSDB_EP_LEN);
req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0; req.clusterCfg.checkTime = 0;
...@@ -69,24 +69,24 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -69,24 +69,24 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->data.latch); taosRUnLockLatch(&pMgmt->pData->latch);
SMonVloadInfo vinfo = {0}; SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo); dmGetVnodeLoads(pMgmt, &vinfo);
req.pVloads = vinfo.pVloads; req.pVloads = vinfo.pVloads;
pMgmt->data.unsyncedVgId = 0; pMgmt->pData->unsyncedVgId = 0;
pMgmt->data.vndState = TAOS_SYNC_STATE_LEADER; pMgmt->pData->vndState = TAOS_SYNC_STATE_LEADER;
for (int32_t i = 0; i < taosArrayGetSize(req.pVloads); ++i) { for (int32_t i = 0; i < taosArrayGetSize(req.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(req.pVloads, i); SVnodeLoad *pLoad = taosArrayGet(req.pVloads, i);
if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) { if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) {
pMgmt->data.unsyncedVgId = pLoad->vgId; pMgmt->pData->unsyncedVgId = pLoad->vgId;
pMgmt->data.vndState = pLoad->syncState; pMgmt->pData->vndState = pLoad->syncState;
} }
} }
SMonMloadInfo minfo = {0}; SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo); dmGetMnodeLoads(pMgmt, &minfo);
pMgmt->data.mndState = minfo.load.syncState; pMgmt->pData->mndState = minfo.load.syncState;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
......
...@@ -27,12 +27,12 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { ...@@ -27,12 +27,12 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
} }
static void dmStopMgmt(SDnodeMgmt *pMgmt) { static void dmStopMgmt(SDnodeMgmt *pMgmt) {
pMgmt->data.stopped = true; pMgmt->pData->stopped = true;
dmStopMonitorThread(pMgmt); dmStopMonitorThread(pMgmt);
dmStopStatusThread(pMgmt); dmStopStatusThread(pMgmt);
} }
static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("dnode-mgmt start to init"); dInfo("dnode-mgmt start to init");
SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt)); SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
...@@ -40,18 +40,6 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) ...@@ -40,18 +40,6 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput)
return -1; return -1;
} }
pMgmt->data.dnodeId = 0;
pMgmt->data.clusterId = 0;
pMgmt->data.dnodeVer = 0;
pMgmt->data.updateTime = 0;
pMgmt->data.rebootTime = taosGetTimestampMs();
pMgmt->data.dropped = 0;
pMgmt->data.localEp = pInput->localEp;
pMgmt->data.localFqdn = pInput->localFqdn;
pMgmt->data.firstEp = pInput->firstEp;
pMgmt->data.secondEp = pInput->secondEp;
pMgmt->data.supportVnodes = pInput->supportVnodes;
pMgmt->data.serverPort = pInput->serverPort;
pMgmt->pDnode = pInput->pDnode; pMgmt->pDnode = pInput->pDnode;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
...@@ -59,21 +47,21 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) ...@@ -59,21 +47,21 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput)
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp; pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp; pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp;
taosInitRWLatch(&pMgmt->data.latch); taosInitRWLatch(&pMgmt->pData->latch);
pMgmt->data.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pMgmt->pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->data.dnodeHash == NULL) { if (pMgmt->pData->dnodeHash == NULL) {
dError("failed to init dnode hash"); dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if (dmReadEps(pMgmt) != 0) { if (dmReadEps(pMgmt->pData) != 0) {
dError("failed to read file since %s", terrstr()); dError("failed to read file since %s", terrstr());
return -1; return -1;
} }
if (pMgmt->data.dropped) { if (pMgmt->pData->dropped) {
dError("dnode will not start since its already dropped"); dError("dnode will not start since its already dropped");
return -1; return -1;
} }
...@@ -82,12 +70,11 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) ...@@ -82,12 +70,11 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput)
return -1; return -1;
} }
if (udfStartUdfd(pMgmt->data.dnodeId) != 0) { if (udfStartUdfd(pMgmt->pData->dnodeId) != 0) {
dError("failed to start udfd"); dError("failed to start udfd");
} }
pOutput->pMgmt = pMgmt; pOutput->pMgmt = pMgmt;
pOutput->mnodeEps = pMgmt->data.mnodeEps;
dInfo("dnode-mgmt is initialized"); dInfo("dnode-mgmt is initialized");
return 0; return 0;
} }
...@@ -96,16 +83,16 @@ static void dmCloseMgmt(SDnodeMgmt *pMgmt) { ...@@ -96,16 +83,16 @@ static void dmCloseMgmt(SDnodeMgmt *pMgmt) {
dInfo("dnode-mgmt start to clean up"); dInfo("dnode-mgmt start to clean up");
dmStopWorker(pMgmt); dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->data.latch); taosWLockLatch(&pMgmt->pData->latch);
if (pMgmt->data.dnodeEps != NULL) { if (pMgmt->pData->dnodeEps != NULL) {
taosArrayDestroy(pMgmt->data.dnodeEps); taosArrayDestroy(pMgmt->pData->dnodeEps);
pMgmt->data.dnodeEps = NULL; pMgmt->pData->dnodeEps = NULL;
} }
if (pMgmt->data.dnodeHash != NULL) { if (pMgmt->pData->dnodeHash != NULL) {
taosHashCleanup(pMgmt->data.dnodeHash); taosHashCleanup(pMgmt->pData->dnodeHash);
pMgmt->data.dnodeHash = NULL; pMgmt->pData->dnodeHash = NULL;
} }
taosWUnLockLatch(&pMgmt->data.latch); taosWUnLockLatch(&pMgmt->pData->latch);
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
dInfo("dnode-mgmt is cleaned up"); dInfo("dnode-mgmt is cleaned up");
......
...@@ -16,30 +16,30 @@ ...@@ -16,30 +16,30 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmInt.h" #include "dmInt.h"
#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \ #define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \
if (!tsMultiProcess) { \ if (!tsMultiProcess) { \
SRpcMsg rsp = {0}; \ SRpcMsg rsp = {0}; \
SRpcMsg req = {.msgType = mtype}; \ SRpcMsg req = {.msgType = mtype}; \
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
tstrncpy(epset.eps[0].fqdn, pMgmt->data.localFqdn, TSDB_FQDN_LEN); \ tstrncpy(epset.eps[0].fqdn, pMgmt->pData->localFqdn, TSDB_FQDN_LEN); \
epset.eps[0].port = pMgmt->data.serverPort; \ epset.eps[0].port = pMgmt->pData->serverPort; \
\ \
rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \ rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \
if (rsp.code == 0 && rsp.contLen > 0) { \ if (rsp.code == 0 && rsp.contLen > 0) { \
func(rsp.pCont, rsp.contLen, pInfo); \ func(rsp.pCont, rsp.contLen, pInfo); \
} \ } \
rpcFreeCont(rsp.pCont); \ rpcFreeCont(rsp.pCont); \
} }
static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) { static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) {
pInfo->protocol = 1; pInfo->protocol = 1;
pInfo->dnode_id = pMgmt->data.dnodeId; pInfo->dnode_id = pMgmt->pData->dnodeId;
pInfo->cluster_id = pMgmt->data.clusterId; pInfo->cluster_id = pMgmt->pData->clusterId;
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
} }
static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) { static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pMgmt->data.rebootTime) / (86400000.0f); pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f);
pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, MNODE); pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, MNODE);
pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, QNODE); pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, QNODE);
pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, SNODE); pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, SNODE);
......
...@@ -24,7 +24,7 @@ static void *dmStatusThreadFp(void *param) { ...@@ -24,7 +24,7 @@ static void *dmStatusThreadFp(void *param) {
while (1) { while (1) {
taosMsleep(200); taosMsleep(200);
if (pMgmt->data.dropped || pMgmt->data.stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs(); int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f; float interval = (curTime - lastTime) / 1000.0f;
...@@ -45,7 +45,7 @@ static void *dmMonitorThreadFp(void *param) { ...@@ -45,7 +45,7 @@ static void *dmMonitorThreadFp(void *param) {
while (1) { while (1) {
taosMsleep(200); taosMsleep(200);
if (pMgmt->data.dropped || pMgmt->data.stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs(); int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f; float interval = (curTime - lastTime) / 1000.0f;
......
...@@ -81,7 +81,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { ...@@ -81,7 +81,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (createReq.replica <= 1 || (createReq.dnodeId != pInput->dnodeId && pInput->dnodeId != 0)) { if (createReq.replica <= 1 || (createReq.dnodeId != pInput->pData->dnodeId && pInput->pData->dnodeId != 0)) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
......
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
#include "wal.h" #include "wal.h"
static bool mmDeployRequired(const SMgmtInputOpt *pInput) { static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
if (pInput->dnodeId > 0) return false; if (pInput->pData->dnodeId > 0) return false;
if (pInput->clusterId > 0) return false; if (pInput->pData->clusterId > 0) return false;
if (strcmp(pInput->localEp, pInput->firstEp) != 0) return false; if (strcmp(pInput->pData->localEp, pInput->pData->firstEp) != 0) return false;
return true; return true;
} }
...@@ -44,8 +44,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu ...@@ -44,8 +44,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
pOption->selfIndex = 0; pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pInput->serverPort; pReplica->port = pInput->pData->serverPort;
tstrncpy(pReplica->fqdn, pInput->localFqdn, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, pInput->pData->localFqdn, TSDB_FQDN_LEN);
pOption->deploy = true; pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
...@@ -118,7 +118,7 @@ static void mmClose(SMnodeMgmt *pMgmt) { ...@@ -118,7 +118,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
dInfo("mnode-mgmt is cleaned up"); dInfo("mnode-mgmt is cleaned up");
} }
static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("mnode-mgmt start to init"); dInfo("mnode-mgmt start to init");
if (walInit() != 0) { if (walInit() != 0) {
dError("failed to init wal since %s", terrstr()); dError("failed to init wal since %s", terrstr());
...@@ -133,7 +133,7 @@ static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -133,7 +133,7 @@ static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->dnodeId; pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
...@@ -181,7 +181,7 @@ static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -181,7 +181,7 @@ static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
} }
} }
pOutput->dnodeId = pMgmt->dnodeId; pInput->pData->dnodeId = pMgmt->dnodeId;
pOutput->pMgmt = pMgmt; pOutput->pMgmt = pMgmt;
dInfo("mnode-mgmt is initialized"); dInfo("mnode-mgmt is initialized");
return 0; return 0;
......
...@@ -52,7 +52,7 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { ...@@ -52,7 +52,7 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr()); dError("failed to create qnode since %s", terrstr());
return -1; return -1;
......
...@@ -34,7 +34,7 @@ static void qmClose(SQnodeMgmt *pMgmt) { ...@@ -34,7 +34,7 @@ static void qmClose(SQnodeMgmt *pMgmt) {
dInfo("qnode-mgmt is cleaned up"); dInfo("qnode-mgmt is cleaned up");
} }
static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("qnode-mgmt start to init"); dInfo("qnode-mgmt start to init");
SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt)); SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
...@@ -44,7 +44,7 @@ static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -44,7 +44,7 @@ static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->dnodeId; pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue; pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue;
......
...@@ -52,7 +52,7 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { ...@@ -52,7 +52,7 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr()); dError("failed to create snode since %s", terrstr());
return -1; return -1;
......
...@@ -35,7 +35,7 @@ static void smClose(SSnodeMgmt *pMgmt) { ...@@ -35,7 +35,7 @@ static void smClose(SSnodeMgmt *pMgmt) {
dInfo("snode-mgmt is cleaned up"); dInfo("snode-mgmt is cleaned up");
} }
int32_t smOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("snode-mgmt start to init"); dInfo("snode-mgmt start to init");
SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt)); SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
...@@ -45,7 +45,7 @@ int32_t smOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -45,7 +45,7 @@ int32_t smOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->dnodeId; pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.pMgmt = pMgmt;
......
...@@ -244,7 +244,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { ...@@ -244,7 +244,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
dInfo("vnode-mgmt is cleaned up"); dInfo("vnode-mgmt is cleaned up");
} }
static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("vnode-mgmt start to init"); dInfo("vnode-mgmt start to init");
int32_t code = -1; int32_t code = -1;
...@@ -253,7 +253,7 @@ static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -253,7 +253,7 @@ static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->dnodeId; pMgmt->dnodeId = pInput->pData->dnodeId;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue;
...@@ -266,11 +266,11 @@ static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -266,11 +266,11 @@ static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
taosInitRWLatch(&pMgmt->latch); taosInitRWLatch(&pMgmt->latch);
SDiskCfg dCfg = {0}; SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pInput->dataDir, TSDB_FILENAME_LEN); tstrncpy(dCfg.dir, pInput->pData->dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0; dCfg.level = 0;
dCfg.primary = 1; dCfg.primary = 1;
SDiskCfg *pDisks = pInput->disks; SDiskCfg *pDisks = pInput->pData->disks;
int32_t numOfDisks = pInput->numOfDisks; int32_t numOfDisks = pInput->pData->numOfDisks;
if (numOfDisks <= 0 || pDisks == NULL) { if (numOfDisks <= 0 || pDisks == NULL) {
pDisks = &dCfg; pDisks = &dCfg;
numOfDisks = 1; numOfDisks = 1;
...@@ -333,7 +333,7 @@ _OVER: ...@@ -333,7 +333,7 @@ _OVER:
} }
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) { static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
*required = pInput->supportVnodes > 0; *required = pInput->pData->supportVnodes > 0;
return 0; return 0;
} }
......
...@@ -128,7 +128,7 @@ typedef struct SDnode { ...@@ -128,7 +128,7 @@ typedef struct SDnode {
SRWLatch latch; SRWLatch latch;
SEpSet mnodeEps; SEpSet mnodeEps;
TdFilePtr lockfile; TdFilePtr lockfile;
SMgmtInputOpt input; SDnodeData data;
SMgmtWrapper wrappers[NODE_END]; SMgmtWrapper wrappers[NODE_END];
} SDnode; } SDnode;
...@@ -140,16 +140,14 @@ void dmCloseNode(SMgmtWrapper *pWrapper); ...@@ -140,16 +140,14 @@ void dmCloseNode(SMgmtWrapper *pWrapper);
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType); SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper); int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
void dmReleaseWrapper(SMgmtWrapper *pWrapper); void dmReleaseWrapper(SMgmtWrapper *pWrapper);
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
void dmSetEvent(SDnode *pDnode, EDndEvent event); void dmSetEvent(SDnode *pDnode, EDndEvent event);
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc); void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc);
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
// dmProc.c // dmProc.c
int32_t dmInitProc(struct SMgmtWrapper *pWrapper); int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
...@@ -164,13 +162,13 @@ void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const ...@@ -164,13 +162,13 @@ void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const
EProcFuncType ftype); EProcFuncType ftype);
// dmTransport.c // dmTransport.c
int32_t dmInitServer(SDnode *pDnode); int32_t dmInitServer(SDnode *pDnode);
void dmCleanupServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode);
int32_t dmInitClient(SDnode *pDnode); int32_t dmInitClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode); void dmCleanupClient(SDnode *pDnode);
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
int32_t dmInitMsgHandle(SDnode *pDnode); int32_t dmInitMsgHandle(SDnode *pDnode);
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// mgmt nodes // mgmt nodes
SMgmtFunc dmGetMgmtFunc(); SMgmtFunc dmGetMgmtFunc();
......
...@@ -19,12 +19,10 @@ ...@@ -19,12 +19,10 @@
static bool dmIsNodeRequired(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; } static bool dmIsNodeRequired(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; }
static bool dmRequireNode(SMgmtWrapper *pWrapper) { static bool dmRequireNode(SMgmtWrapper *pWrapper) {
SMgmtInputOpt *pInput = &pWrapper->pDnode->input; SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
bool required = false; bool required = false;
int32_t code = (*pWrapper->func.requiredFp)(pInput, &required); int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
if (!required) { if (!required) {
dDebug("node:%s, does not require startup", pWrapper->name); dDebug("node:%s, does not require startup", pWrapper->name);
} }
...@@ -55,24 +53,24 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -55,24 +53,24 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
dInfo("dnode will run in child-process mode, node:%s", pWrapper->name); dInfo("dnode will run in child-process mode, node:%s", pWrapper->name);
} }
pDnode->input.dnodeId = 0; pDnode->data.dnodeId = 0;
pDnode->input.clusterId = 0; pDnode->data.clusterId = 0;
pDnode->input.localEp = strdup(pOption->localEp); pDnode->data.dnodeVer = 0;
pDnode->input.localFqdn = strdup(pOption->localFqdn); pDnode->data.updateTime = 0;
pDnode->input.firstEp = strdup(pOption->firstEp); pDnode->data.rebootTime = taosGetTimestampMs();
pDnode->input.secondEp = strdup(pOption->secondEp); pDnode->data.dropped = 0;
pDnode->input.serverPort = pOption->serverPort; pDnode->data.localEp = strdup(pOption->localEp);
pDnode->input.supportVnodes = pOption->numOfSupportVnodes; pDnode->data.localFqdn = strdup(pOption->localFqdn);
pDnode->input.numOfDisks = pOption->numOfDisks; pDnode->data.firstEp = strdup(pOption->firstEp);
pDnode->input.disks = pOption->disks; pDnode->data.secondEp = strdup(pOption->secondEp);
pDnode->input.dataDir = strdup(pOption->dataDir); pDnode->data.serverPort = pOption->serverPort;
pDnode->input.pDnode = pDnode; pDnode->data.supportVnodes = pOption->numOfSupportVnodes;
pDnode->input.processCreateNodeFp = dmProcessCreateNodeReq; pDnode->data.numOfDisks = pOption->numOfDisks;
pDnode->input.processDropNodeFp = dmProcessDropNodeReq; pDnode->data.disks = pOption->disks;
pDnode->input.isNodeRequiredFp = dmIsNodeRequired; pDnode->data.dataDir = strdup(pOption->dataDir);
if (pDnode->input.dataDir == NULL || pDnode->input.localEp == NULL || pDnode->input.localFqdn == NULL || if (pDnode->data.dataDir == NULL || pDnode->data.localEp == NULL || pDnode->data.localFqdn == NULL ||
pDnode->input.firstEp == NULL || pDnode->input.secondEp == NULL) { pDnode->data.firstEp == NULL || pDnode->data.secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -92,11 +90,11 @@ static void dmClearVars(SDnode *pDnode) { ...@@ -92,11 +90,11 @@ static void dmClearVars(SDnode *pDnode) {
pDnode->lockfile = NULL; pDnode->lockfile = NULL;
} }
taosMemoryFreeClear(pDnode->input.localEp); taosMemoryFreeClear(pDnode->data.localEp);
taosMemoryFreeClear(pDnode->input.localFqdn); taosMemoryFreeClear(pDnode->data.localFqdn);
taosMemoryFreeClear(pDnode->input.firstEp); taosMemoryFreeClear(pDnode->data.firstEp);
taosMemoryFreeClear(pDnode->input.secondEp); taosMemoryFreeClear(pDnode->data.secondEp);
taosMemoryFreeClear(pDnode->input.dataDir); taosMemoryFreeClear(pDnode->data.dataDir);
taosThreadMutexDestroy(&pDnode->mutex); taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
...@@ -322,7 +320,7 @@ _OVER: ...@@ -322,7 +320,7 @@ _OVER:
rpcFreeCont(pReq->pCont); rpcFreeCont(pReq->pCont);
} }
int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { static int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) { if (pWrapper != NULL) {
dmReleaseWrapper(pWrapper); dmReleaseWrapper(pWrapper);
...@@ -340,12 +338,9 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs ...@@ -340,12 +338,9 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
return -1; return -1;
} }
SMgmtInputOpt *pInput = &pWrapper->pDnode->input; SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
pInput->msgCb = dmGetMsgcb(pWrapper);
int32_t code = (*pWrapper->func.createFp)(pInput, pMsg); int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
if (code != 0) { if (code != 0) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else { } else {
...@@ -360,7 +355,7 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs ...@@ -360,7 +355,7 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
return code; return code;
} }
int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { static int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) { if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED; terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
...@@ -387,4 +382,19 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) ...@@ -387,4 +382,19 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
} }
taosThreadMutexUnlock(&pDnode->mutex); taosThreadMutexUnlock(&pDnode->mutex);
return code; return code;
}
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
SMgmtInputOpt opt = {
.pDnode = pWrapper->pDnode,
.pData = &pWrapper->pDnode->data,
.processCreateNodeFp = dmProcessCreateNodeReq,
.processDropNodeFp = dmProcessDropNodeReq,
.isNodeRequiredFp = dmIsNodeRequired,
.name = pWrapper->name,
.path = pWrapper->path,
};
opt.msgCb = dmGetMsgcb(pWrapper);
return opt;
} }
\ No newline at end of file
...@@ -92,17 +92,14 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { ...@@ -92,17 +92,14 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
} }
SMgmtOutputOpt output = {0}; SMgmtOutputOpt output = {0};
SMgmtInputOpt *pInput = &pWrapper->pDnode->input; SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
pInput->msgCb = dmGetMsgcb(pWrapper);
if (pWrapper->ntype == DNODE || OnlyInChildProc(pWrapper->proc.ptype)) { if (pWrapper->ntype == DNODE || OnlyInChildProc(pWrapper->proc.ptype)) {
tmsgSetDefaultMsgCb(&pInput->msgCb); tmsgSetDefaultMsgCb(&input.msgCb);
} }
if (OnlyInSingleProc(pWrapper->proc.ptype)) { if (OnlyInSingleProc(pWrapper->proc.ptype)) {
if ((*pWrapper->func.openFp)(pInput, &output) != 0) { if ((*pWrapper->func.openFp)(&input, &output) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
...@@ -111,7 +108,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { ...@@ -111,7 +108,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
} }
if (InChildProc(pWrapper->proc.ptype)) { if (InChildProc(pWrapper->proc.ptype)) {
if ((*pWrapper->func.openFp)(pInput, &output) != 0) { if ((*pWrapper->func.openFp)(&input, &output) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
...@@ -138,15 +135,9 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { ...@@ -138,15 +135,9 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, has been opened in parent process", pWrapper->name); dDebug("node:%s, has been opened in parent process", pWrapper->name);
} }
if (output.dnodeId != 0) {
pInput->dnodeId = output.dnodeId;
}
if (output.pMgmt != NULL) { if (output.pMgmt != NULL) {
pWrapper->pMgmt = output.pMgmt; pWrapper->pMgmt = output.pMgmt;
} }
if (output.mnodeEps.numOfEps != 0) {
pWrapper->pDnode->mnodeEps = output.mnodeEps;
}
dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned"); dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned");
return 0; return 0;
...@@ -254,8 +245,8 @@ static void dmWatchNodes(SDnode *pDnode) { ...@@ -254,8 +245,8 @@ static void dmWatchNodes(SDnode *pDnode) {
taosThreadMutexLock(&pDnode->mutex); taosThreadMutexLock(&pDnode->mutex);
for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SProc *proc = &pWrapper->proc; SProc *proc = &pWrapper->proc;
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (!InParentProc(proc->ptype)) continue; if (!InParentProc(proc->ptype)) continue;
......
...@@ -240,7 +240,7 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { ...@@ -240,7 +240,7 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->input.localFqdn) == 0 && epSet.eps[i].port == pDnode->input.serverPort) { if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps; epSet.inUse = (i + 1) % epSet.numOfEps;
} }
...@@ -453,8 +453,8 @@ int32_t dmInitServer(SDnode *pDnode) { ...@@ -453,8 +453,8 @@ int32_t dmInitServer(SDnode *pDnode) {
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
strncpy(rpcInit.localFqdn, pDnode->input.localFqdn, strlen(pDnode->input.localFqdn)); strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn));
rpcInit.localPort = pDnode->input.serverPort; rpcInit.localPort = pDnode->data.serverPort;
rpcInit.label = "DND"; rpcInit.label = "DND";
rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = (RpcCfp)dmProcessMsg; rpcInit.cfp = (RpcCfp)dmProcessMsg;
......
...@@ -84,35 +84,50 @@ typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype ...@@ -84,35 +84,50 @@ typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype
typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
typedef bool (*IsNodeRequiredFp)(struct SDnode *pDnode, EDndNodeType ntype); typedef bool (*IsNodeRequiredFp)(struct SDnode *pDnode, EDndNodeType ntype);
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
int32_t unsyncedVgId;
ESyncState vndState;
ESyncState mndState;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
SRWLatch latch;
SMsgCb msgCb;
const char *localEp;
const char *localFqdn;
const char *firstEp;
const char *secondEp;
int32_t supportVnodes;
uint16_t serverPort;
int32_t numOfDisks;
SDiskCfg *disks;
const char *dataDir;
} SDnodeData;
typedef struct { typedef struct {
const char *path; const char *path;
const char *name; const char *name;
SMsgCb msgCb;
int32_t dnodeId;
int64_t clusterId;
const char *localEp;
const char *firstEp;
const char *secondEp;
const char *localFqdn;
uint16_t serverPort;
int32_t supportVnodes;
int32_t numOfDisks;
SDiskCfg *disks;
const char *dataDir;
struct SDnode *pDnode; struct SDnode *pDnode;
SDnodeData *pData;
SMsgCb msgCb;
ProcessCreateNodeFp processCreateNodeFp; ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
IsNodeRequiredFp isNodeRequiredFp; IsNodeRequiredFp isNodeRequiredFp;
} SMgmtInputOpt; } SMgmtInputOpt;
typedef struct { typedef struct {
int32_t dnodeId; void *pMgmt;
void *pMgmt;
SEpSet mnodeEps;
} SMgmtOutputOpt; } SMgmtOutputOpt;
typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg); typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg);
typedef int32_t (*NodeOpenFp)(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput); typedef int32_t (*NodeOpenFp)(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput);
typedef void (*NodeCloseFp)(void *pMgmt); typedef void (*NodeCloseFp)(void *pMgmt);
typedef int32_t (*NodeStartFp)(void *pMgmt); typedef int32_t (*NodeStartFp)(void *pMgmt);
typedef void (*NodeStopFp)(void *pMgmt); typedef void (*NodeStopFp)(void *pMgmt);
...@@ -155,30 +170,10 @@ TdFilePtr dmCheckRunning(const char *dataDir); ...@@ -155,30 +170,10 @@ TdFilePtr dmCheckRunning(const char *dataDir);
int32_t dmReadShmFile(const char *path, const char *name, EDndNodeType runType, SShm *pShm); int32_t dmReadShmFile(const char *path, const char *name, EDndNodeType runType, SShm *pShm);
int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm); int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm);
// common define // dmEps.c
typedef struct { int32_t dmReadEps(SDnodeData *pData);
int32_t dnodeId; int32_t dmWriteEps(SDnodeData *pData);
int64_t clusterId; void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
int32_t unsyncedVgId;
ESyncState vndState;
ESyncState mndState;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
SRWLatch latch;
SMsgCb msgCb;
const char *localEp;
const char *localFqdn;
const char *firstEp;
const char *secondEp;
int32_t supportVnodes;
uint16_t serverPort;
} SDnodeData;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -14,16 +14,16 @@ ...@@ -14,16 +14,16 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmInt.h" #include "dmUtil.h"
static void dmPrintEps(SDnodeMgmt *pMgmt); static void dmPrintEps(SDnodeData *pData);
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep); static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps); static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
static void dmGetDnodeEp(SDnodeMgmt *pMgmt, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
taosRLockLatch(&pMgmt->data.latch); taosRLockLatch(&pData->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->data.dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
if (pPort != NULL) { if (pPort != NULL) {
*pPort = pDnodeEp->ep.port; *pPort = pDnodeEp->ep.port;
...@@ -36,10 +36,10 @@ static void dmGetDnodeEp(SDnodeMgmt *pMgmt, int32_t dnodeId, char *pEp, char *pF ...@@ -36,10 +36,10 @@ static void dmGetDnodeEp(SDnodeMgmt *pMgmt, int32_t dnodeId, char *pEp, char *pF
} }
} }
taosRUnLockLatch(&pMgmt->data.latch); taosRUnLockLatch(&pData->latch);
} }
int32_t dmReadEps(SDnodeMgmt *pMgmt) { int32_t dmReadEps(SDnodeData *pData) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 256 * 1024; int32_t maxLen = 256 * 1024;
...@@ -48,13 +48,13 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) { ...@@ -48,13 +48,13 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
pMgmt->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pMgmt->data.dnodeEps == NULL) { if (pData->dnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno)); dError("failed to calloc dnodeEp array since %s", strerror(errno));
goto _OVER; goto _OVER;
} }
snprintf(file, sizeof(file), "%s%sdnode.json", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", pData->dataDir, TD_DIRSEP, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_READ); pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
code = 0; code = 0;
...@@ -79,21 +79,21 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) { ...@@ -79,21 +79,21 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) {
dError("failed to read %s since dnodeId not found", file); dError("failed to read %s since dnodeId not found", file);
goto _OVER; goto _OVER;
} }
pMgmt->data.dnodeId = dnodeId->valueint; pData->dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) { if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", file); dError("failed to read %s since clusterId not found", file);
goto _OVER; goto _OVER;
} }
pMgmt->data.clusterId = atoll(clusterId->valuestring); pData->clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) { if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file); dError("failed to read %s since dropped not found", file);
goto _OVER; goto _OVER;
} }
pMgmt->data.dropped = dropped->valueint; pData->dropped = dropped->valueint;
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
if (!dnodes || dnodes->type != cJSON_Array) { if (!dnodes || dnodes->type != cJSON_Array) {
...@@ -143,29 +143,29 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) { ...@@ -143,29 +143,29 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) {
} }
dnodeEp.isMnode = isMnode->valueint; dnodeEp.isMnode = isMnode->valueint;
taosArrayPush(pMgmt->data.dnodeEps, &dnodeEp); taosArrayPush(pData->dnodeEps, &dnodeEp);
} }
code = 0; code = 0;
dDebug("succcessed to read file %s", file); dDebug("succcessed to read file %s", file);
dmPrintEps(pMgmt); dmPrintEps(pData);
_OVER: _OVER:
if (content != NULL) taosMemoryFree(content); if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
if (taosArrayGetSize(pMgmt->data.dnodeEps) == 0) { if (taosArrayGetSize(pData->dnodeEps) == 0) {
SDnodeEp dnodeEp = {0}; SDnodeEp dnodeEp = {0};
dnodeEp.isMnode = 1; dnodeEp.isMnode = 1;
taosGetFqdnPortFromEp(pMgmt->data.firstEp, &dnodeEp.ep); taosGetFqdnPortFromEp(pData->firstEp, &dnodeEp.ep);
taosArrayPush(pMgmt->data.dnodeEps, &dnodeEp); taosArrayPush(pData->dnodeEps, &dnodeEp);
} }
dmResetEps(pMgmt, pMgmt->data.dnodeEps); dmResetEps(pData, pData->dnodeEps);
if (dmIsEpChanged(pMgmt, pMgmt->data.dnodeId, pMgmt->data.localEp)) { if (dmIsEpChanged(pData, pData->dnodeId, pData->localEp)) {
dError("localEp %s different with %s and need reconfigured", pMgmt->data.localEp, file); dError("localEp %s different with %s and need reconfigured", pData->localEp, file);
return -1; return -1;
} }
...@@ -173,11 +173,12 @@ _OVER: ...@@ -173,11 +173,12 @@ _OVER:
return code; return code;
} }
int32_t dmWriteEps(SDnodeMgmt *pMgmt) { int32_t dmWriteEps(SDnodeData *pData) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", pData->dataDir, TD_DIRSEP, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", pData->dataDir, TD_DIRSEP, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
...@@ -191,14 +192,14 @@ int32_t dmWriteEps(SDnodeMgmt *pMgmt) { ...@@ -191,14 +192,14 @@ int32_t dmWriteEps(SDnodeMgmt *pMgmt) {
char *content = taosMemoryCalloc(1, maxLen + 1); char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->data.dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->data.clusterId); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->data.dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps); int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
for (int32_t i = 0; i < numOfEps; ++i) { for (int32_t i = 0; i < numOfEps; ++i) {
SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->data.dnodeEps, i); SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port);
...@@ -222,41 +223,41 @@ int32_t dmWriteEps(SDnodeMgmt *pMgmt) { ...@@ -222,41 +223,41 @@ int32_t dmWriteEps(SDnodeMgmt *pMgmt) {
return -1; return -1;
} }
pMgmt->data.updateTime = taosGetTimestampMs(); pData->updateTime = taosGetTimestampMs();
dDebug("successed to write %s", realfile); dDebug("successed to write %s", realfile);
return 0; return 0;
} }
void dmUpdateEps(SDnodeMgmt *pMgmt, SArray *eps) { void dmUpdateEps(SDnodeData *pData, SArray *eps) {
int32_t numOfEps = taosArrayGetSize(eps); int32_t numOfEps = taosArrayGetSize(eps);
if (numOfEps <= 0) return; if (numOfEps <= 0) return;
taosWLockLatch(&pMgmt->data.latch); taosWLockLatch(&pData->latch);
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps); int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps);
if (numOfEps != numOfEpsOld) { if (numOfEps != numOfEpsOld) {
dmResetEps(pMgmt, eps); dmResetEps(pData, eps);
dmWriteEps(pMgmt); dmWriteEps(pData);
} else { } else {
int32_t size = numOfEps * sizeof(SDnodeEp); int32_t size = numOfEps * sizeof(SDnodeEp);
if (memcmp(pMgmt->data.dnodeEps->pData, eps->pData, size) != 0) { if (memcmp(pData->dnodeEps->pData, eps->pData, size) != 0) {
dmResetEps(pMgmt, eps); dmResetEps(pData, eps);
dmWriteEps(pMgmt); dmWriteEps(pData);
} }
} }
taosWUnLockLatch(&pMgmt->data.latch); taosWUnLockLatch(&pData->latch);
} }
static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) { static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
if (pMgmt->data.dnodeEps != dnodeEps) { if (pData->dnodeEps != dnodeEps) {
SArray *tmp = pMgmt->data.dnodeEps; SArray *tmp = pData->dnodeEps;
pMgmt->data.dnodeEps = taosArrayDup(dnodeEps); pData->dnodeEps = taosArrayDup(dnodeEps);
taosArrayDestroy(tmp); taosArrayDestroy(tmp);
} }
pMgmt->data.mnodeEps.inUse = 0; pData->mnodeEps.inUse = 0;
pMgmt->data.mnodeEps.numOfEps = 0; pData->mnodeEps.numOfEps = 0;
int32_t mIndex = 0; int32_t mIndex = 0;
int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps); int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
...@@ -265,35 +266,35 @@ static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) { ...@@ -265,35 +266,35 @@ static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) {
SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i); SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
if (!pDnodeEp->isMnode) continue; if (!pDnodeEp->isMnode) continue;
if (mIndex >= TSDB_MAX_REPLICA) continue; if (mIndex >= TSDB_MAX_REPLICA) continue;
pMgmt->data.mnodeEps.numOfEps++; pData->mnodeEps.numOfEps++;
pMgmt->data.mnodeEps.eps[mIndex] = pDnodeEp->ep; pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
mIndex++; mIndex++;
} }
for (int32_t i = 0; i < numOfEps; i++) { for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i); SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
taosHashPut(pMgmt->data.dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
} }
dmPrintEps(pMgmt); dmPrintEps(pData);
} }
static void dmPrintEps(SDnodeMgmt *pMgmt) { static void dmPrintEps(SDnodeData *pData) {
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps); int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
dDebug("print dnode ep list, num:%d", numOfEps); dDebug("print dnode ep list, num:%d", numOfEps);
for (int32_t i = 0; i < numOfEps; i++) { for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pEp = taosArrayGet(pMgmt->data.dnodeEps, i); SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i);
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
} }
} }
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) { static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
bool changed = false; bool changed = false;
if (dnodeId == 0) return changed; if (dnodeId == 0) return changed;
taosRLockLatch(&pMgmt->data.latch); taosRLockLatch(&pData->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->data.dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
char epstr[TSDB_EP_LEN + 1] = {0}; char epstr[TSDB_EP_LEN + 1] = {0};
snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
...@@ -303,6 +304,6 @@ static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) { ...@@ -303,6 +304,6 @@ static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) {
} }
} }
taosRUnLockLatch(&pMgmt->data.latch); taosRUnLockLatch(&pData->latch);
return changed; return changed;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册