提交 003864f9 编写于 作者: S Shengliang Guan

shm

上级 ba12601d
......@@ -48,17 +48,17 @@ typedef struct {
char secondEp[TSDB_EP_LEN];
SDiskCfg *pDisks;
int32_t numOfDisks;
} SDndCfg;
} SDnodeOpt;
typedef enum { DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent;
/**
* @brief Initialize and start the dnode.
*
* @param pCfg Config of the dnode.
* @param pOption Option of the dnode.
* @return SDnode* The dnode object.
*/
SDnode *dndCreate(SDndCfg *pCfg);
SDnode *dndCreate(const SDnodeOpt *pOption);
/**
* @brief Stop and cleanup the dnode.
......@@ -80,7 +80,7 @@ int32_t dndRun(SDnode *pDnode);
* @param pDnode The dnode object to close.
* @param event The event to handle.
*/
void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
#ifdef __cplusplus
}
......
......@@ -32,17 +32,16 @@ typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SDnode *pDnode;
PutReqToMWriteQFp putReqToMWriteQFp;
PutReqToMReadQFp putReqToMReadQFp;
SendReqToDnodeFp sendReqFp;
SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp;
int32_t dnodeId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SDnode *pDnode;
PutToQueueFp putReqToMWriteQFp;
PutToQueueFp putReqToMReadQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendReqToMnodeFp;
} SMnodeOpt;
/* ------------------------ SMnode ------------------------ */
......
......@@ -244,7 +244,7 @@ void tfsClosedir(STfsDir *pDir);
* @param pTfs The fs object.
* @param pInfo The info object.
*/
void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo);
int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo);
#ifdef __cplusplus
}
......
......@@ -53,6 +53,10 @@ typedef struct {
void *pNode;
} SNodeMsg;
typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq);
typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg);
typedef struct SRpcInit {
uint16_t localPort; // local port
char * label; // for debug purpose
......
......@@ -40,8 +40,8 @@ int32_t dndInitBnode(SDnode *pDnode);
void dndCleanupBnode(SDnode *pDnode);
void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -23,8 +23,6 @@ extern "C" {
#endif
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -180,8 +180,8 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
pOption->sver = tsVersion;
}
......@@ -268,7 +268,7 @@ int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.dnodeId != dmGetDnodeId(pDnode)) {
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to create bnode since %s", terrstr());
return -1;
......@@ -284,7 +284,7 @@ int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
......
......@@ -17,8 +17,8 @@
#include "bmMsg.h"
#include "bmWorker.h"
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;}
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;}
void bmInitMsgHandles(SMgmtWrapper *pWrapper) {
......
......@@ -43,7 +43,6 @@
#include "snode.h"
#include "tfs.h"
#include "vnode.h"
#include "monitor.h"
#ifdef __cplusplus
extern "C" {
......@@ -57,10 +56,10 @@ extern "C" {
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStat;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef struct SMgmtFp SMgmtFp;
typedef struct SMgmtWrapper SMgmtWrapper;
......@@ -121,11 +120,22 @@ typedef struct {
} STransMgmt;
typedef struct SDnode {
int64_t clusterId;
int32_t dnodeId;
int32_t numOfSupportVnodes;
int64_t rebootTime;
char *localEp;
char *localFqdn;
char *firstEp;
char *secondEp;
char *dataDir;
SDiskCfg *pDisks;
int32_t numOfDisks;
uint16_t serverPort;
bool dropped;
EDndStatus status;
EDndEvent event;
EProcType procType;
SDndCfg cfg;
SStartupReq startup;
TdFilePtr pLockFile;
STransMgmt trans;
......@@ -149,16 +159,16 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dndSendMonitorReport(SDnode *pDnode);
// dndNode.h
SDnode *dndCreate(SDndCfg *pCfg);
SDnode *dndCreate(const SDnodeOpt *pOption);
void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
void dndSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
// dndTransport.h
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
int32_t dndSendReqToMnode(void *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(void *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
// dndWorker.h
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
......
......@@ -22,10 +22,10 @@
extern "C" {
#endif
SDnode *dndCreate(SDndCfg *pCfg);
SDnode *dndCreate(const SDnodeOpt *pOption);
void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
......
......@@ -28,8 +28,8 @@ int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode);
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -25,16 +25,14 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo);
return 0;
return vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo);
}
static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->dnode_id = dmGetDnodeId(pDnode);
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
pInfo->cluster_id = dmGetClusterId(pDnode);
pInfo->protocol = 1;
pInfo->dnode_id = pDnode->dnodeId;
pInfo->cluster_id = pDnode->clusterId;
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
}
static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
......@@ -56,7 +54,7 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
void dndSendMonitorReport(SDnode *pDnode) {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("pDnode:%p, send monitor report to %s:%u", pDnode, tsMonitorFqdn, tsMonitorPort);
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SMonInfo *pMonitor = monCreateMonitorInfo();
if (pMonitor == NULL) return;
......
......@@ -48,6 +48,7 @@ static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openF
static void dndCloseNode(SMgmtWrapper *pWrapper) {
if (pWrapper->required) {
(*pWrapper->fp.closeFp)(pWrapper);
pWrapper->required = false;
}
if (pWrapper->pProc) {
taosProcCleanup(pWrapper->pProc);
......@@ -55,6 +56,26 @@ static void dndCloseNode(SMgmtWrapper *pWrapper) {
}
}
static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
pDnode->serverPort = pOption->serverPort;
pDnode->dataDir = strdup(pOption->dataDir);
pDnode->localEp = strdup(pOption->localEp);
pDnode->localFqdn = strdup(pOption->localFqdn);
pDnode->firstEp = strdup(pOption->firstEp);
pDnode->secondEp = strdup(pOption->secondEp);
pDnode->pDisks = pOption->pDisks;
pDnode->numOfDisks = pOption->numOfDisks;
pDnode->rebootTime = taosGetTimestampMs();
if (pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL ||
pDnode->dataDir == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndClearMemory(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
......@@ -65,11 +86,16 @@ static void dndClearMemory(SDnode *pDnode) {
taosCloseFile(&pDnode->pLockFile);
pDnode->pLockFile = NULL;
}
tfree(pDnode);
tfree(pDnode->localEp);
tfree(pDnode->localFqdn);
tfree(pDnode->firstEp);
tfree(pDnode->secondEp);
tfree(pDnode->dataDir);
free(pDnode);
dDebug("dnode object memory is cleared, data:%p", pDnode);
}
SDnode *dndCreate(SDndCfg *pCfg) {
SDnode *dndCreate(const SDnodeOpt *pOption) {
dInfo("start to create dnode object");
int32_t code = -1;
char path[PATH_MAX + 100];
......@@ -81,10 +107,12 @@ SDnode *dndCreate(SDndCfg *pCfg) {
goto _OVER;
}
memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg));
if (dndInitMemory(pDnode, pOption) != 0) {
goto _OVER;
}
dndSetStatus(pDnode, DND_STAT_INIT);
pDnode->rebootTime = taosGetTimestampMs();
pDnode->pLockFile = dndCheckRunning(pCfg->dataDir);
pDnode->pLockFile = dndCheckRunning(pDnode->dataDir);
if (pDnode->pLockFile == NULL) {
goto _OVER;
}
......@@ -112,10 +140,10 @@ SDnode *dndCreate(SDndCfg *pCfg) {
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
snprintf(path, sizeof(path), "%s%s%s", pCfg->dataDir, TD_DIRSEP, pDnode->wrappers[n].name);
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
pWrapper->pDnode = pDnode;
if (pDnode->wrappers[n].path == NULL) {
if (pWrapper->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
......@@ -201,7 +229,7 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(pWrapper->pDnode, pRsp);
dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp);
} else {
rpcSendResponse(pRsp);
}
......@@ -222,6 +250,11 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
}
}
void dndSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
pRsp->code = TSDB_CODE_APP_NOT_READY;
dndSendRsp(pWrapper, pRsp);
}
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from child queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
......@@ -354,7 +387,7 @@ int32_t dndRun(SDnode *pDnode) {
return 0;
}
void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode object receive event %d, data:%p", event, pDnode);
pDnode->event = event;
}
......@@ -375,7 +408,7 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE), pEpSet);
}
int32_t code = -1;
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dndTransport.h"
#include "dndNode.h"
#include "dmInt.h"
#include "mmInt.h"
......@@ -133,7 +134,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
......@@ -216,7 +217,7 @@ int32_t dndInitServer(SDnode *pDnode) {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->cfg.serverPort;
rpcInit.localPort = pDnode->serverPort;
rpcInit.label = "DND";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dndProcessRequest;
......@@ -271,8 +272,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
return 0;
}
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) {
STransMgmt *pMgmt = &pDnode->trans;
static int32_t dndSetReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE;
return -1;
......@@ -282,8 +282,18 @@ int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) {
return 0;
}
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pReq) {
int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = wrapper;
STransMgmt *pTrans = &pWrapper->pDnode->trans;
return dndSetReq(pTrans, pEpSet, pReq);
}
int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = wrapper;
SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
return dndSendReqToDnode(pDnode, &epSet, pReq);
dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet);
return dndSetReq(pTrans, &epSet, pReq);
}
......@@ -82,7 +82,7 @@ void dndCleanupWorker(SDnodeWorker *pWorker) {
}
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) {
if (pWorker == NULL || pWorker->queue == NULL) {
if (pWorker == NULL || pWorker->queue == NULL ) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
......
......@@ -13,24 +13,24 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_MSG_H_
#define _TD_DND_DNODE_MSG_H_
#ifndef _TD_DND_DNODE_H_
#define _TD_DND_DNODE_H_
#include "dmInt.h"
#include "dndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
void dmGetMgmtFp(SMgmtWrapper *pWrapper);
void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet);
void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeMgmt *pMgmt);
int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq);
void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);
void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp);
int32_t dmStart(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_MSG_H_*/
\ No newline at end of file
#endif /*_TD_DND_DNODE_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_FILE_H_
#define _TD_DND_DNODE_FILE_H_
#include "dmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t dmReadFile(SDnodeMgmt *pMgmt);
int32_t dmWriteFile(SDnodeMgmt *pMgmt);
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_FILE_H_*/
\ No newline at end of file
......@@ -16,24 +16,19 @@
#ifndef _TD_DND_DNODE_INT_H_
#define _TD_DND_DNODE_INT_H_
#include "dndInt.h"
#include "dm.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SDnodeMgmt {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
char localEp[TSDB_EP_LEN];
char firstEp[TSDB_EP_LEN];
int64_t dver;
int64_t updateTime;
int8_t statusSent;
SEpSet mnodeEpSet;
SHashObj *dnodeHash;
SArray *pDnodeEps;
SArray *dnodeEps;
pthread_t *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
......@@ -42,17 +37,24 @@ typedef struct SDnodeMgmt {
SDnode *pDnode;
} SDnodeMgmt;
// dmInt.h
void dmGetMgmtFp(SMgmtWrapper *pWrapper);
int32_t dmGetDnodeId(SDnode *pDnode);
int64_t dmGetClusterId(SDnode *pDnode);
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
// dmFile.c
int32_t dmReadFile(SDnodeMgmt *pMgmt);
int32_t dmWriteFile(SDnodeMgmt *pMgmt);
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
// dmWorker.h
// dmInt.c
// dmMsg.c
void dmSendStatusReq(SDnodeMgmt *pMgmt);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
// dmWorker.c
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
void dmStopWorker(SDnodeMgmt *pMgmt);
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_WORKER_H_
#define _TD_DND_DNODE_WORKER_H_
#include "dmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
void dmStopWorker(SDnodeMgmt *pMgmt);
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_WORKER_H_*/
\ No newline at end of file
......@@ -14,11 +14,11 @@
*/
#define _DEFAULT_SOURCE
#include "dmFile.h"
#include "dmInt.h"
static void dmPrintDnodes(SDnodeMgmt *pMgmt);
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep);
static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep);
static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps);
int32_t dmReadFile(SDnodeMgmt *pMgmt) {
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
......@@ -28,9 +28,10 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) {
cJSON *root = NULL;
char file[PATH_MAX];
TdFilePtr pFile = NULL;
SDnode *pDnode = pMgmt->pDnode;
pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pMgmt->pDnodeEps == NULL) {
pMgmt->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pMgmt->dnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno));
goto PRASE_DNODE_OVER;
}
......@@ -61,21 +62,21 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) {
dError("failed to read %s since dnodeId not found", file);
goto PRASE_DNODE_OVER;
}
pMgmt->dnodeId = dnodeId->valueint;
pDnode->dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", file);
goto PRASE_DNODE_OVER;
}
pMgmt->clusterId = atoll(clusterId->valuestring);
pDnode->clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_DNODE_OVER;
}
pMgmt->dropped = dropped->valueint;
pDnode->dropped = dropped->valueint;
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
if (!dnodes || dnodes->type != cJSON_Array) {
......@@ -125,7 +126,7 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) {
}
dnodeEp.isMnode = isMnode->valueint;
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
taosArrayPush(pMgmt->dnodeEps, &dnodeEp);
}
code = 0;
......@@ -137,25 +138,27 @@ PRASE_DNODE_OVER:
if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile);
if (dmIsEpChanged(pMgmt, pMgmt->pDnode->cfg.localEp)) {
dError("localEp %s different with %s and need reconfigured", pMgmt->pDnode->cfg.localEp, file);
if (dmIsEpChanged(pMgmt, pDnode->dnodeId, pDnode->localEp)) {
dError("localEp %s different with %s and need reconfigured", pDnode->localEp, file);
return -1;
}
if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) {
if (taosArrayGetSize(pMgmt->dnodeEps) == 0) {
SDnodeEp dnodeEp = {0};
dnodeEp.isMnode = 1;
taosGetFqdnPortFromEp(pMgmt->pDnode->cfg.firstEp, &dnodeEp.ep);
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
taosGetFqdnPortFromEp(pDnode->firstEp, &dnodeEp.ep);
taosArrayPush(pMgmt->dnodeEps, &dnodeEp);
}
dmResetDnodes(pMgmt, pMgmt->pDnodeEps);
dmResetDnodes(pMgmt, pMgmt->dnodeEps);
terrno = code;
return code;
}
int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP);
......@@ -171,14 +174,14 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pDnode->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pDnode->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->dnodeEps);
for (int32_t i = 0; i < numOfEps; ++i) {
SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->pDnodeEps, i);
SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->dnodeEps, i);
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port);
......@@ -210,20 +213,20 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
return 0;
}
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
int32_t numOfEps = taosArrayGetSize(pDnodeEps);
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) {
int32_t numOfEps = taosArrayGetSize(dnodeEps);
if (numOfEps <= 0) return;
taosWLockLatch(&pMgmt->latch);
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->dnodeEps);
if (numOfEps != numOfEpsOld) {
dmResetDnodes(pMgmt, pDnodeEps);
dmResetDnodes(pMgmt, dnodeEps);
dmWriteFile(pMgmt);
} else {
int32_t size = numOfEps * sizeof(SDnodeEp);
if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) {
dmResetDnodes(pMgmt, pDnodeEps);
if (memcmp(pMgmt->dnodeEps->pData, dnodeEps->pData, size) != 0) {
dmResetDnodes(pMgmt, dnodeEps);
dmWriteFile(pMgmt);
}
}
......@@ -231,10 +234,10 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
taosWUnLockLatch(&pMgmt->latch);
}
static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
if (pMgmt->pDnodeEps != pDnodeEps) {
SArray *tmp = pMgmt->pDnodeEps;
pMgmt->pDnodeEps = taosArrayDup(pDnodeEps);
static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps) {
if (pMgmt->dnodeEps != dnodeEps) {
SArray *tmp = pMgmt->dnodeEps;
pMgmt->dnodeEps = taosArrayDup(dnodeEps);
taosArrayDestroy(tmp);
}
......@@ -242,10 +245,10 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
pMgmt->mnodeEpSet.numOfEps = 0;
int32_t mIndex = 0;
int32_t numOfEps = (int32_t)taosArrayGetSize(pDnodeEps);
int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i);
SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
if (!pDnodeEp->isMnode) continue;
if (mIndex >= TSDB_MAX_REPLICA) continue;
pMgmt->mnodeEpSet.numOfEps++;
......@@ -255,7 +258,7 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
}
for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i);
SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
}
......@@ -263,19 +266,19 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
}
static void dmPrintDnodes(SDnodeMgmt *pMgmt) {
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps);
int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->dnodeEps);
dDebug("print dnode ep list, num:%d", numOfEps);
for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pEp = taosArrayGet(pMgmt->pDnodeEps, i);
SDnodeEp *pEp = taosArrayGet(pMgmt->dnodeEps, i);
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
}
}
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep) {
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) {
bool changed = false;
taosRLockLatch(&pMgmt->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &pMgmt->dnodeId, sizeof(int32_t));
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) {
char epstr[TSDB_EP_LEN + 1];
snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
......
......@@ -15,45 +15,18 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "dmFile.h"
#include "dmMsg.h"
#include "dmWorker.h"
int32_t dmGetDnodeId(SDnode *pDnode) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
int32_t dnodeId = pMgmt->dnodeId;
taosRUnLockLatch(&pMgmt->latch);
return dnodeId;
}
int64_t dmGetClusterId(SDnode *pDnode) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
int64_t clusterId = pMgmt->clusterId;
taosRUnLockLatch(&pMgmt->latch);
return clusterId;
}
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
*pEpSet = pMgmt->mnodeEpSet;
taosRUnLockLatch(&pMgmt->latch);
}
void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->mnodeEpSet = *pEpSet;
......@@ -64,10 +37,8 @@ void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosWUnLockLatch(&pMgmt->latch);
}
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
......@@ -86,16 +57,17 @@ void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint1
taosRUnLockLatch(&pMgmt->latch);
}
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
tmsg_t msgType = pReq->msgType;
void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode;
tmsg_t msgType = pReq->msgType;
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
dmGetMnodeEpSet(pWrapper, &epSet);
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) {
if (strcmp(epSet.eps[i].fqdn, pDnode->localFqdn) == 0 && epSet.eps[i].port == pDnode->serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
......@@ -105,18 +77,21 @@ void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
rpcSendRedirectRsp(pReq->handle, &epSet);
}
int32_t dmStart(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
return dmStartWorker(pMgmt);
}
int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
dInfo("dnode-mgmt is initialized");
pMgmt->dnodeId = 0;
pMgmt->dropped = 0;
pMgmt->clusterId = 0;
pDnode->dnodeId = 0;
pDnode->dropped = 0;
pDnode->clusterId = 0;
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pDnode;
memcpy(pMgmt->localEp, pDnode->cfg.localEp, TSDB_EP_LEN);
memcpy(pMgmt->firstEp, pDnode->cfg.firstEp, TSDB_EP_LEN);
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
......@@ -131,7 +106,7 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
return -1;
}
if (pMgmt->dropped) {
if (pDnode->dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
......@@ -150,9 +125,9 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
taosWLockLatch(&pMgmt->latch);
if (pMgmt->pDnodeEps != NULL) {
taosArrayDestroy(pMgmt->pDnodeEps);
pMgmt->pDnodeEps = NULL;
if (pMgmt->dnodeEps != NULL) {
taosArrayDestroy(pMgmt->dnodeEps);
pMgmt->dnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
......
......@@ -14,9 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dmMsg.h"
#include "dmFile.h"
#include "dmWorker.h"
#include "dmInt.h"
#include "vmInt.h"
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
......@@ -26,13 +24,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
taosRLockLatch(&pMgmt->latch);
req.sver = tsVersion;
req.dver = pMgmt->dver;
req.dnodeId = pMgmt->dnodeId;
req.clusterId = pMgmt->clusterId;
req.dnodeId = pDnode->dnodeId;
req.clusterId = pDnode->clusterId;
req.rebootTime = pDnode->rebootTime;
req.updateTime = pMgmt->updateTime;
req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes;
memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN);
req.numOfSupportVnodes = pDnode->numOfSupportVnodes;
memcpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN);
req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0;
......@@ -59,23 +57,26 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
}
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->dnodeId == 0) {
SDnode *pDnode = pMgmt->pDnode;
if (pDnode->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
pDnode->dnodeId = pCfg->dnodeId;
pDnode->clusterId = pCfg->clusterId;
dmWriteFile(pMgmt);
taosWUnLockLatch(&pMgmt->latch);
}
}
void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, DNODE)->pMgmt;
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pRsp = &pMsg->rpcMsg;
if (pRsp->code != TSDB_CODE_SUCCESS) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1;
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->dropped && pDnode->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->dnodeId);
pDnode->dropped = 1;
dmWriteFile(pMgmt);
}
} else {
......@@ -92,13 +93,22 @@ void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
pMgmt->statusSent = 0;
}
void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); }
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pRsp = &pMsg->rpcMsg;
dError("auth rsp is received, but not supported yet");
return 0;
}
void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("grant rsp is received, but not supported yet"); }
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pRsp = &pMsg->rpcMsg;
dError("grant rsp is received, but not supported yet");
return 0;
}
int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq) {
dError("config req is received, but not supported yet");
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCfgDnodeReq *pCfg = pReq->pCont;
dError("config req is received, but not supported yet");
return TSDB_CODE_OPS_NOT_SUPPORT;
}
......
......@@ -14,8 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dmWorker.h"
#include "dmMsg.h"
#include "dmInt.h"
#include "bmInt.h"
#include "mmInt.h"
......@@ -34,7 +33,7 @@ static void *dmThreadRoutine(void *param) {
while (true) {
pthread_testcancel();
taosMsleep(200);
if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) {
if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->dropped) {
continue;
}
......@@ -54,68 +53,68 @@ static void *dmThreadRoutine(void *param) {
}
}
static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) {
int32_t code = 0;
SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
dTrace("msg:%p, will be processed", pNodeMsg);
static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed", pMsg);
switch (pMsg->msgType) {
switch (msgType) {
case TDMT_DND_CREATE_MNODE:
code = mmProcessCreateReq(pDnode, pMsg);
code = mmProcessCreateReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg);
break;
case TDMT_DND_ALTER_MNODE:
code = mmProcessAlterReq(pDnode, pMsg);
code = mmProcessAlterReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg);
break;
case TDMT_DND_DROP_MNODE:
code = mmProcessDropReq(pDnode, pMsg);
code = mmProcessDropReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg);
break;
case TDMT_DND_CREATE_QNODE:
code = qmProcessCreateReq(pDnode, pMsg);
code = qmProcessCreateReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg);
break;
case TDMT_DND_DROP_QNODE:
code = qmProcessDropReq(pDnode, pMsg);
code = qmProcessDropReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg);
break;
case TDMT_DND_CREATE_SNODE:
code = smProcessCreateReq(pDnode, pMsg);
code = smProcessCreateReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg);
break;
case TDMT_DND_DROP_SNODE:
code = smProcessDropReq(pDnode, pMsg);
code = smProcessDropReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg);
break;
case TDMT_DND_CREATE_BNODE:
code = bmProcessCreateReq(pDnode, pMsg);
code = bmProcessCreateReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg);
break;
case TDMT_DND_DROP_BNODE:
code = bmProcessDropReq(pDnode, pMsg);
code = bmProcessDropReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg);
break;
case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(pDnode, pMsg);
code = dmProcessConfigReq(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg);
break;
case TDMT_MND_STATUS_RSP:
dmProcessStatusRsp(pDnode, pMsg);
code = dmProcessStatusRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg);
break;
case TDMT_MND_AUTH_RSP:
dmProcessAuthRsp(pDnode, pMsg);
code = dmProcessAuthRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg);
break;
case TDMT_MND_GRANT_RSP:
dmProcessGrantRsp(pDnode, pMsg);
code = dmProcessGrantRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType));
break;
}
if (pMsg->msgType & 1u) {
if (msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
taosFreeQitem(pNodeMsg);
dTrace("msg:%p, is freed", pNodeMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.pCont = NULL;
taosFreeQitem(pMsg);
dTrace("msg:%p, is freed", pMsg);
}
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
......
......@@ -36,10 +36,10 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
void dndDumpCfg();
void dndPrintVersion();
void dndGenerateGrant();
SDndCfg dndGetCfg();
void dndDumpCfg();
void dndPrintVersion();
void dndGenerateGrant();
SDnodeOpt dndGetOpt();
#ifdef __cplusplus
}
......
......@@ -28,7 +28,7 @@ static struct {
static void dndSigintHandle(int signum, void *info, void *ctx) {
dInfo("singal:%d is received", signum);
dndeHandleEvent(global.pDnode, DND_EVENT_STOP);
dndHandleEvent(global.pDnode, DND_EVENT_STOP);
}
static void dndSetSignalHandle() {
......@@ -71,8 +71,9 @@ static int32_t dndRunDnode() {
return -1;
}
SDndCfg objCfg = dndGetCfg();
SDnode *pDnode = dndCreate(&objCfg);
SDnodeOpt option = dndGetOpt();
SDnode *pDnode = dndCreate(&option);
if (pDnode == NULL) {
dError("failed to to create dnode object since %s", terrstr());
return -1;
......
......@@ -38,18 +38,18 @@ void dndDumpCfg() {
cfgDumpCfg(pCfg, 0, 1);
}
SDndCfg dndGetCfg() {
SConfig *pCfg = taosGetCfg();
SDndCfg objCfg = {0};
SDnodeOpt dndGetOpt() {
SConfig *pCfg = taosGetCfg();
SDnodeOpt option = {0};
objCfg.numOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
tstrncpy(objCfg.dataDir, tsDataDir, sizeof(objCfg.dataDir));
tstrncpy(objCfg.firstEp, tsFirst, sizeof(objCfg.firstEp));
tstrncpy(objCfg.secondEp, tsSecond, sizeof(objCfg.firstEp));
objCfg.serverPort = tsServerPort;
tstrncpy(objCfg.localFqdn, tsLocalFqdn, sizeof(objCfg.localFqdn));
snprintf(objCfg.localEp, sizeof(objCfg.localEp), "%s:%u", objCfg.localFqdn, objCfg.serverPort);
objCfg.pDisks = tsDiskCfg;
objCfg.numOfDisks = tsDiskCfgNum;
return objCfg;
option.numOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
tstrncpy(option.dataDir, tsDataDir, sizeof(option.dataDir));
tstrncpy(option.firstEp, tsFirst, sizeof(option.firstEp));
tstrncpy(option.secondEp, tsSecond, sizeof(option.firstEp));
option.serverPort = tsServerPort;
tstrncpy(option.localFqdn, tsLocalFqdn, sizeof(option.localFqdn));
snprintf(option.localEp, sizeof(option.localEp), "%s:%u", option.localFqdn, option.serverPort);
option.pDisks = tsDiskCfg;
option.numOfDisks = tsDiskCfgNum;
return option;
}
......@@ -54,9 +54,9 @@ int32_t mmDrop(SMnodeMgmt *pMgmt);
int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate);
// mmHandle.h
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -24,9 +24,9 @@ extern "C" {
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
// int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
// int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
// int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -28,8 +28,8 @@ int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -142,11 +142,10 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue;
pOption->putReqToMReadQFp = mmPutMsgToReadQueue;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
}
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
......@@ -157,8 +156,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1;
pReplica->port = pDnode->cfg.serverPort;
memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
pReplica->port = pDnode->serverPort;
memcpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN);
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
......@@ -176,8 +175,8 @@ int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnod
SDnode *pDnode = pMgmt->pDnode;
mmInitOption(pMgmt, pOption);
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
pOption->replica = pCreate->replica;
pOption->selfIndex = -1;
......@@ -259,15 +258,15 @@ _OVER:
}
static bool mmDeployRequired(SDnode *pDnode) {
if (dmGetDnodeId(pDnode) > 0) {
if (pDnode->dnodeId > 0) {
return false;
}
if (dmGetClusterId(pDnode) > 0) {
if (pDnode->clusterId > 0) {
return false;
}
if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) {
if (strcmp(pDnode->localEp, pDnode->firstEp) != 0) {
return false;
}
......
......@@ -18,9 +18,9 @@
#include "dmInt.h"
#include "mmWorker.h"
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
......@@ -28,7 +28,7 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.replica <= 1 || createReq.dnodeId != dmGetDnodeId(pDnode)) {
if (createReq.replica <= 1 || createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
......@@ -53,9 +53,9 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
return mmOpen(pMgmt, &option);
}
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
......@@ -63,7 +63,7 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (alterReq.dnodeId != dmGetDnodeId(pDnode)) {
if (alterReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr());
return -1;
......@@ -90,9 +90,9 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) {
return code;
}
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropMnodeReq dropReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......@@ -100,7 +100,7 @@ int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr());
return -1;
......
......@@ -125,14 +125,16 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker
return code;
}
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc);
int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpc) {
// SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
// SMnodeMgmt *pMgmt = pWrapper->pMgmt;
// return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc);
return 0;
}
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc);
int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpc) {
// SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE);
// SMnodeMgmt *pMgmt = pWrapper->pMgmt;
// return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc);
return 0;
}
......@@ -45,8 +45,8 @@ void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
// qmHandle.h
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -23,8 +23,8 @@ extern "C" {
#endif
void qmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -185,9 +185,9 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
pOption->sver = tsVersion;
}
......@@ -274,7 +274,7 @@ int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.dnodeId != dmGetDnodeId(pDnode)) {
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
return -1;
......@@ -290,7 +290,7 @@ int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
return -1;
......
......@@ -17,8 +17,8 @@
#include "qmMsg.h"
#include "qmWorker.h"
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg){return 0;}
int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;}
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg){return 0;}
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
}
......
......@@ -39,8 +39,8 @@ int32_t dndInitSnode(SDnode *pDnode);
void dndCleanupSnode(SDnode *pDnode);
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -210,9 +210,9 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
pOption->sver = tsVersion;
}
......@@ -299,7 +299,7 @@ int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.dnodeId != dmGetDnodeId(pDnode)) {
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr());
return -1;
......@@ -315,7 +315,7 @@ int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
return -1;
......
......@@ -17,8 +17,8 @@
#include "smMsg.h"
#include "smWorker.h"
int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;}
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;}
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
}
......@@ -24,7 +24,7 @@ class TestServer {
bool DoStart();
private:
SDndCfg BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
SDnodeOpt BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
private:
SDnode* pDnode;
......
......@@ -22,22 +22,22 @@ void* serverLoop(void* param) {
}
}
SDndCfg TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SDndCfg cfg = {0};
cfg.numOfSupportVnodes = 16;
cfg.serverPort = port;
strcpy(cfg.dataDir, path);
snprintf(cfg.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
snprintf(cfg.localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
snprintf(cfg.firstEp, TSDB_EP_LEN, "%s", firstEp);
return cfg;
SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SDnodeOpt option = {0};
option.numOfSupportVnodes = 16;
option.serverPort = port;
strcpy(option.dataDir, path);
snprintf(option.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
snprintf(option.localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
snprintf(option.firstEp, TSDB_EP_LEN, "%s", firstEp);
return option;
}
bool TestServer::DoStart() {
SDndCfg cfg = BuildOption(path, fqdn, port, firstEp);
SDnodeOpt option = BuildOption(path, fqdn, port, firstEp);
taosMkDir(path);
pDnode = dndCreate(&cfg);
pDnode = dndCreate(&option);
if (pDnode != NULL) {
return false;
}
......
......@@ -85,10 +85,10 @@ typedef struct {
} SVnodeThread;
// interface
void vmGetMgmtFp(SMgmtWrapper *pWrapper);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
void vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
void vmGetMgmtFp(SMgmtWrapper *pWrapper);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
// vmInt.h
SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId);
......
......@@ -279,11 +279,11 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
taosInitRWLatch(&pMgmt->latch);
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
tstrncpy(dCfg.dir, pDnode->dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
SDiskCfg *pDisks = pDnode->cfg.pDisks;
int32_t numOfDisks = pDnode->cfg.numOfDisks;
SDiskCfg *pDisks = pDnode->pDisks;
int32_t numOfDisks = pDnode->numOfDisks;
if (numOfDisks <= 0 || pDisks == NULL) {
pDisks = &dCfg;
numOfDisks = 1;
......@@ -342,11 +342,11 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper->fp = mgmtFp;
}
void vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) {
int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
if (pMgmt == NULL) return -1;
tfsGetMonitorInfo(pMgmt->pTfs, pInfo);
return tfsGetMonitorInfo(pMgmt->pTfs, pInfo);
}
void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) {
......
......@@ -70,7 +70,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
SWrapperCfg wrapperCfg = {0};
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
if (createReq.dnodeId != dmGetDnodeId(pMgmt->pDnode)) {
if (createReq.dnodeId != pMgmt->pDnode->dnodeId) {
terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
return -1;
......
......@@ -119,11 +119,10 @@ typedef struct SMnode {
SHashObj *infosMeta;
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SendReqToDnodeFp sendReqFp;
SendReqToMnodeFp sendReqToMnodeFp;
SendRedirectRspFp sendRedirectRspFp;
PutReqToMWriteQFp putReqToMWriteQFp;
PutReqToMReadQFp putReqToMReadQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendReqToMnodeFp;
PutToQueueFp putReqToMWriteQFp;
PutToQueueFp putReqToMReadQFp;
} SMnode;
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
......
......@@ -291,9 +291,8 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp;
pMnode->sendReqFp = pOption->sendReqFp;
pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
if (pMnode->sendReqFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL ||
if (pMnode->sendReqFp == NULL || pMnode->sendReqToMnodeFp == NULL ||
pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1;
......
......@@ -389,7 +389,7 @@ static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg) {
}
static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
char dirName[TSDB_FILENAME_LEN] = "\0";
char dirName[TSDB_FILENAME_LEN] = "\0";
if (pCfg->level < 0 || pCfg->level >= TFS_MAX_TIERS) {
fError("failed to mount %s to FS since invalid level %d", pCfg->dir, pCfg->level);
......@@ -539,9 +539,9 @@ static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter) {
return pDisk;
}
void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
pInfo->datadirs = taosArrayInit(32, sizeof(SMonDiskDesc));
if (pInfo->datadirs == NULL) return;
if (pInfo->datadirs == NULL) return -1;
tfsUpdateSize(pTfs);
......@@ -558,4 +558,6 @@ void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
}
}
tfsUnLock(pTfs);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册