提交 504a3906 编写于 作者: S Shengliang Guan

Merge branch 'feature/dnode3' into 3.0

......@@ -28,10 +28,6 @@ extern char tsSecond[];
extern char tsLocalFqdn[];
extern char tsLocalEp[];
extern uint16_t tsServerPort;
extern uint16_t tsDnodeShellPort;
extern uint16_t tsDnodeDnodePort;
extern uint16_t tsSyncPort;
extern uint16_t tsArbitratorPort;
extern int32_t tsStatusInterval;
extern int32_t tsNumOfMnodes;
extern int8_t tsEnableVnodeBak;
......
......@@ -78,7 +78,7 @@ typedef struct {
* @brief data file's directory.
*
*/
char dataDir[PATH_MAX];
char dataDir[TSDB_FILENAME_LEN];
/**
* @brief local endpoint.
......@@ -121,10 +121,10 @@ typedef struct {
/**
* @brief Initialize and start the dnode.
*
* @param pOptions Options of the dnode.
* @param pOption Option of the dnode.
* @return SDnode* The dnode object.
*/
SDnode *dndInit(SDnodeOpt *pOptions);
SDnode *dndInit(SDnodeOpt *pOption);
/**
* @brief Stop and cleanup the dnode.
......
......@@ -24,10 +24,10 @@ extern "C" {
typedef struct SDnode SDnode;
typedef struct SMnode SMnode;
typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg);
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg);
typedef struct SMnodeLoad {
int64_t numOfDnode;
......@@ -53,17 +53,17 @@ typedef struct {
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp;
} SMnodeOptions;
} SMnodeOpt;
/* ------------------------ SMnode ------------------------ */
/**
* @brief Open a mnode.
*
* @param path Path of the mnode
* @param pOptions Options of the mnode
* @param pOption Option of the mnode
* @return SMnode* The mnode object
*/
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions);
SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption);
/**
* @brief Close a mnode
......@@ -76,10 +76,10 @@ void mnodeClose(SMnode *pMnode);
* @brief Close a mnode
*
* @param pMnode The mnode object to close
* @param pOptions Options of the mnode
* @param pOption Options of the mnode
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions);
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption);
/**
* @brief Drop a mnode.
......
......@@ -184,27 +184,9 @@ typedef struct {
SRpcMsg rpcMsg[];
} SVnodeMsg;
typedef struct SDnode SDnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg);
typedef struct {
PutMsgToVnodeQFp putMsgToApplyQueueFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
} SVnodePara;
int32_t vnodeInit(SVnodePara);
void vnodeCleanup();
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg);
void vnodeDrop(SVnode *pVnode);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
......
......@@ -358,12 +358,6 @@ do { \
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_PORT_DNODESHELL 0
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
#define TSDB_PORT_HTTP 11
#define TSDB_PORT_ARBITRATOR 12
#define TSDB_MAX_WAL_SIZE (1024*1024*3)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
......
......@@ -33,10 +33,6 @@ char tsArbitrator[TSDB_EP_LEN] = {0};
char tsLocalFqdn[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030;
uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035]
uint16_t tsDnodeDnodePort = 6035; // udp/tcp
uint16_t tsSyncPort = 6040;
uint16_t tsArbitratorPort = 6042;
int32_t tsStatusInterval = 1; // second
int32_t tsNumOfMnodes = 1;
int8_t tsEnableVnodeBak = 1;
......@@ -1726,11 +1722,6 @@ int32_t taosCheckGlobalCfg() {
}
}
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
tsHttpPort = tsServerPort + TSDB_PORT_HTTP;
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
}
......
......@@ -141,13 +141,13 @@ void dmnInitOption(SDnodeOpt *pOption) {
pOption->shellActivityTimer = tsShellActivityTimer;
pOption->statusInterval = tsStatusInterval;
pOption->serverPort = tsServerPort;
tstrncpy(pOption->dataDir, tsDataDir, TSDB_EP_LEN);
tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN);
tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(pOption->localFqdn, tsLocalEp, TSDB_FQDN_LEN);
tstrncpy(pOption->firstEp, tsFirst, TSDB_FQDN_LEN);
tstrncpy(pOption->timezone, tsLocalEp, TSDB_TIMEZONE_LEN);
tstrncpy(pOption->locale, tsLocalEp, TSDB_LOCALE_LEN);
tstrncpy(pOption->charset, tsLocalEp, TSDB_LOCALE_LEN);
tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN);
tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN);
tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN);
}
int dmnRunDnode() {
......
......@@ -21,16 +21,16 @@ extern "C" {
#endif
#include "dndInt.h"
int32_t dndInitDnode(SDnode *pDnd);
void dndCleanupDnode(SDnode *pDnd);
void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndInitDnode(SDnode *pDnode);
void dndCleanupDnode(SDnode *pDnode);
void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnd);
int64_t dndGetClusterId(SDnode *pDnd);
void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell);
int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -31,18 +31,19 @@ extern "C" {
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "dnode.h"
#include "mnode.h"
#include "vnode.h"
#include "dnode.h"
extern int32_t dDebugFlag;
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("SRV FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("SRV ERROR ", 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("SRV WARN ", 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("SRV ", 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
......@@ -54,17 +55,16 @@ typedef struct {
} SDnodeDir;
typedef struct {
int32_t dnodeId;
uint32_t rebootTime;
int32_t dropped;
int64_t clusterId;
SEpSet shellEpSet;
SEpSet peerEpSet;
char *file;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
pthread_mutex_t mutex;
int32_t dnodeId;
int32_t dropped;
uint32_t rebootTime;
int64_t clusterId;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
SRWLatch latch;
} SDnodeMgmt;
typedef struct {
......@@ -108,10 +108,10 @@ typedef struct SDnode {
EStat stat;
SDnodeOpt opt;
SDnodeDir dir;
SDnodeMgmt d;
SMnodeMgmt m;
SDnodeMgmt dmgmt;
SMnodeMgmt mmgmt;
SVnodesMgmt vmgmt;
STransMgmt t;
STransMgmt tmgmt;
SStartupMsg startup;
} SDnode;
......@@ -119,7 +119,7 @@ EStat dndGetStat(SDnode *pDnode);
void dndSetStat(SDnode *pDnode, EStat stat);
char *dndStatStr(EStat stat);
void dndReportStartup(SDnode *pDnode, char *name, char *desc);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
#ifdef __cplusplus
......
......@@ -58,8 +58,8 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode);
static int32_t dndReadMnodeFile(SDnode *pDnode);
static int32_t dndWriteMnodeFile(SDnode *pDnode);
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions);
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndDropMnode(SDnode *pDnode);
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
......@@ -67,7 +67,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static SMnode *dndAcquireMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = NULL;
int32_t refCount = 0;
......@@ -85,7 +85,7 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) {
}
static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
......@@ -98,7 +98,7 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
}
static int32_t dndReadMnodeFile(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 300;
......@@ -152,7 +152,7 @@ PRASE_MNODE_OVER:
}
static int32_t dndWriteMnodeFile(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
char file[PATH_MAX + 20] = {0};
snprintf(file, sizeof(file), "%s.bak", pMgmt->file);
......@@ -212,7 +212,7 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) {
}
static void dndStopMnodeWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
......@@ -243,6 +243,7 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
if (dndGetClusterId(pDnode) > 0) {
return false;
}
if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) {
return false;
}
......@@ -250,43 +251,49 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
return true;
}
static void dndInitMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions) {
pOptions->pDnode = pDnode;
pOptions->sendMsgToDnodeFp = dndSendMsgToDnode;
pOptions->sendMsgToMnodeFp = dndSendMsgToMnode;
pOptions->sendRedirectMsgFp = dndSendRedirectMsg;
pOptions->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
}
static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCreateMnodeMsg *pMsg) {
dndInitMnodeOptions(pDnode, pOptions);
if (pMsg == NULL) {
pOptions->dnodeId = 1;
pOptions->clusterId = 1234;
pOptions->replica = 1;
pOptions->selfIndex = 0;
SReplica *pReplica = &pOptions->replicas[0];
pReplica->id = 1;
pReplica->port = pDnode->opt.serverPort;
tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
} else {
pOptions->dnodeId = dndGetDnodeId(pDnode);
pOptions->clusterId = dndGetClusterId(pDnode);
pOptions->selfIndex = -1;
pOptions->replica = pMsg->replica;
for (int32_t index = 0; index < pMsg->replica; ++index) {
SReplica *pReplica = &pOptions->replicas[index];
pReplica->id = pMsg->replicas[index].id;
pReplica->port = pMsg->replicas[index].port;
tstrncpy(pReplica->fqdn, pMsg->replicas[index].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOptions->dnodeId) {
pOptions->selfIndex = index;
}
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
pOption->sendRedirectMsgFp = dndSendRedirectMsg;
pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
}
static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
dndInitMnodeOption(pDnode, pOption);
pOption->replica = 1;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1;
pReplica->port = pDnode->opt.serverPort;
tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
}
static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
dndInitMnodeOption(pDnode, pOption);
pOption->replica = 0;
}
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeMsg *pMsg) {
dndInitMnodeOption(pDnode, pOption);
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->replica = pMsg->replica;
pOption->selfIndex = -1;
for (int32_t index = 0; index < pMsg->replica; ++index) {
SReplica *pReplica = &pOption->replicas[index];
pReplica->id = pMsg->replicas[index].id;
pReplica->port = pMsg->replicas[index].port;
tstrncpy(pReplica->fqdn, pMsg->replicas[index].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) {
pOption->selfIndex = index;
}
}
if (pOptions->selfIndex == -1) {
if (pOption->selfIndex == -1) {
terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND;
dError("failed to build mnode options since %s", terrstr());
return -1;
......@@ -295,8 +302,8 @@ static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCr
return 0;
}
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
SMnodeMgmt *pMgmt = &pDnode->m;
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = dndStartMnodeWorker(pDnode);
if (code != 0) {
......@@ -304,7 +311,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
return code;
}
SMnode *pMnode = mnodeOpen(pDnode->dir.mnode, pOptions);
SMnode *pMnode = mnodeOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
code = terrno;
......@@ -331,8 +338,8 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
return 0;
}
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
SMnodeMgmt *pMgmt = &pDnode->m;
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) {
......@@ -340,7 +347,7 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
return -1;
}
if (mnodeAlter(pMnode, pOptions) != 0) {
if (mnodeAlter(pMnode, pOption) != 0) {
dError("failed to alter mnode since %s", terrstr());
dndReleaseMnode(pDnode, pMnode);
return -1;
......@@ -351,7 +358,7 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
}
static int32_t dndDropMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) {
......@@ -399,8 +406,8 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1;
} else {
SMnodeOptions option = {0};
if (dndBuildMnodeOptions(pDnode, &option, pMsg) != 0) {
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
return dndOpenMnode(pDnode, &option);
......@@ -414,8 +421,8 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1;
} else {
SMnodeOptions option = {0};
if (dndBuildMnodeOptions(pDnode, &option, pMsg) != 0) {
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
return dndAlterMnode(pDnode, &option);
......@@ -458,7 +465,7 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
}
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
......@@ -472,7 +479,7 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
......@@ -486,7 +493,7 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
......@@ -500,7 +507,7 @@ static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
......@@ -532,7 +539,7 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs
}
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
......@@ -545,7 +552,7 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
}
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
......@@ -557,7 +564,7 @@ void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
......@@ -569,7 +576,7 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
......@@ -581,7 +588,7 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) {
......@@ -594,7 +601,7 @@ static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
}
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, NULL, (FProcessItem)dndProcessMnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -604,19 +611,19 @@ static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
}
static void dndFreeMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
pMgmt->pMgmtQ = NULL;
}
static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "mnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -624,13 +631,12 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
}
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->mgmtPool);
}
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, NULL, (FProcessItem)dndProcessMnodeReadQueue);
if (pMgmt->pReadQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -640,19 +646,19 @@ static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
}
static void dndFreeMnodeReadQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ);
pMgmt->pReadQ = NULL;
}
static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->readPool;
pPool->name = "mnode-read";
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -660,12 +666,12 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
}
static void dndCleanupMnodeReadWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->readPool);
}
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeWriteQueue);
if (pMgmt->pWriteQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -675,13 +681,13 @@ static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
}
static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ);
pMgmt->pWriteQ = NULL;
}
static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeApplyQueue);
if (pMgmt->pApplyQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -691,19 +697,19 @@ static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) {
}
static void dndFreeMnodeApplyQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pApplyQ);
pMgmt->pApplyQ = NULL;
}
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->writePool;
pPool->name = "mnode-write";
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -711,12 +717,12 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
}
static void dndCleanupMnodeWriteWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->writePool);
}
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, NULL, (FProcessItem)dndProcessMnodeSyncQueue);
if (pMgmt->pSyncQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -726,28 +732,33 @@ static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
}
static void dndFreeMnodeSyncQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ);
pMgmt->pSyncQ = NULL;
}
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->syncPool;
pPool->name = "mnode-sync";
pPool->min = 0;
pPool->max = 1;
return tWorkerInit(pPool);
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndCleanupMnodeSyncWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->syncPool);
}
int32_t dndInitMnode(SDnode *pDnode) {
dInfo("dnode-mnode start to init");
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosInitRWLatch(&pMgmt->latch);
if (dndInitMnodeMgmtWorker(pDnode) != 0) {
......@@ -781,17 +792,19 @@ int32_t dndInitMnode(SDnode *pDnode) {
}
dInfo("start to deploy mnode");
SMnodeOpt option = {0};
dndBuildMnodeDeployOption(pDnode, &option);
return dndOpenMnode(pDnode, &option);
} else {
dInfo("start to open mnode");
SMnodeOpt option = {0};
dndBuildMnodeOpenOption(pDnode, &option);
return dndOpenMnode(pDnode, &option);
}
SMnodeOptions option = {0};
dndInitMnodeOptions(pDnode, &option);
return dndOpenMnode(pDnode, &option);
}
void dndCleanupMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
dInfo("dnode-mnode start to clean up");
dndStopMnodeWorker(pDnode);
......@@ -801,7 +814,7 @@ void dndCleanupMnode(SDnode *pDnode) {
}
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMnodeMgmt *pMgmt = &pDnode->m;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) {
......
......@@ -25,6 +25,10 @@
#include "dndMnode.h"
#include "dndVnodes.h"
#define INTERNAL_USER "_internal"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_secret"
static void dndInitMsgFp(STransMgmt *pMgmt) {
// msg from client to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_SUBMIT] = dndProcessVnodeWriteMsg;
......@@ -121,7 +125,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
int32_t msgType = pMsg->msgType;
......@@ -143,19 +147,19 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static int32_t dndInitClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.sessions = 8;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.user = "-internal";
rpcInit.ckey = "-key";
rpcInit.secret = "-secret";
rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY;
rpcInit.secret = INTERNAL_SECRET;
pMgmt->clientRpc = rpcOpen(&rpcInit);
if (pMgmt->clientRpc == NULL) {
......@@ -167,7 +171,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
}
static void dndCleanupClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->clientRpc) {
rpcClose(pMgmt->clientRpc);
pMgmt->clientRpc = NULL;
......@@ -176,8 +180,8 @@ static void dndCleanupClient(SDnode *pDnode) {
}
static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->t;
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
......@@ -218,24 +222,56 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, INTERNAL_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
taosEncryptPass((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
return 0;
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
taosEncryptPass((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
return 0;
} else {
return -1;
}
}
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent;
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) != 0) {
if (terrno != TSDB_CODE_APP_NOT_READY) {
dTrace("failed to get user auth from mnode since %s", terrstr());
return -1;
}
if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) {
dTrace("get internal auth success");
return 0;
}
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) {
dTrace("get auth from internal mnode");
return 0;
}
if (terrno != TSDB_CODE_APP_NOT_READY) {
dTrace("failed to get user auth from internal mnode since %s", terrstr());
return -1;
}
dDebug("user:%s, send auth msg to mnodes", user);
dDebug("user:%s, send auth msg to other mnodes", user);
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, TSDB_USER_LEN);
......@@ -246,14 +282,14 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
dError("user:%s, failed to get user auth from mnodes since %s", user, terrstr());
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
dDebug("user:%s, success to get user auth from mnodes", user);
dDebug("user:%s, success to get user auth from other mnodes", user);
}
rpcFreeCont(rpcRsp.pCont);
......@@ -261,7 +297,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
}
static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
dndInitMsgFp(pMgmt);
int32_t numOfThreads = (int32_t)((pDnode->opt.numOfCores * pDnode->opt.numOfThreadsPerCore) / 2.0);
......@@ -290,7 +326,7 @@ static int32_t dndInitServer(SDnode *pDnode) {
}
static void dndCleanupServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->serverRpc) {
rpcClose(pMgmt->serverRpc);
pMgmt->serverRpc = NULL;
......@@ -317,7 +353,7 @@ void dndCleanupTrans(SDnode *pDnode) {
}
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransMgmt *pMgmt = &pDnode->t;
STransMgmt *pMgmt = &pDnode->tmgmt;
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
}
......
......@@ -22,6 +22,7 @@ typedef struct {
int32_t refCount;
int8_t dropped;
int8_t accessState;
char *path;
SVnode *pImpl;
taos_queue pWriteQ;
taos_queue pSyncQ;
......@@ -74,7 +75,7 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnode
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl);
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode);
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes);
......@@ -125,7 +126,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
}
}
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl) {
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
......@@ -139,6 +140,12 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl;
pVnode->path = tstrdup(path);
if (pVnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) {
return -1;
}
......@@ -232,7 +239,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_
snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);
fp = fopen(file, "r");
if (!fp) {
if (fp == NULL) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_VNODE_OVER;
......@@ -354,22 +361,25 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) {
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId);
SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
// SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
SVnode *pImpl = vnodeOpen(path, NULL);
if (pImpl == NULL) {
return -1;
}
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, pImpl);
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl);
if (code != 0) {
vnodeDrop(pImpl);
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
vnodeDrop(pImpl);
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
......@@ -385,7 +395,8 @@ static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
}
dndDropVnodeWrapper(pDnode, pVnode);
vnodeDrop(pVnode->pImpl);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0;
}
......@@ -413,7 +424,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dndCreateVnodeWrapper(pDnode, pVnode->vgId, pImpl);
dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->opened++;
}
......@@ -433,7 +444,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->hash == NULL) {
dError("failed to init vnode hash");
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -874,13 +885,13 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) {
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -918,6 +929,7 @@ static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
......@@ -938,7 +950,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
pPool->min = (int32_t)threadsForQuery;
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pPool = &pMgmt->fetchPool;
......@@ -946,7 +959,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores);
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
......@@ -998,7 +1012,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
pPool->name = "vnode-write";
pPool->max = tsNumOfCores;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -1036,7 +1050,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......
......@@ -20,8 +20,8 @@
#include "dndVnodes.h"
#include "sync.h"
#include "tcache.h"
#include "wal.h"
#include "tcrc32c.h"
#include "wal.h"
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
......@@ -43,10 +43,10 @@ char *dndStatStr(EStat stat) {
}
}
void dndReportStartup(SDnode *pDnode, char *name, char *desc) {
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) {
SStartupMsg *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
......@@ -61,7 +61,7 @@ static int32_t dndCheckRunning(char *dataDir) {
FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
if (fd < 0) {
dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -77,20 +77,20 @@ static int32_t dndCheckRunning(char *dataDir) {
return 0;
}
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
if (dndCheckRunning(pOptions->dataDir) != 0) {
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
if (dndCheckRunning(pOption->dataDir) != 0) {
return -1;
}
char path[PATH_MAX + 100];
snprintf(path, sizeof(path), "%s%smnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.mnode = strdup(path);
snprintf(path, sizeof(path), "%s%smnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.mnode = tstrdup(path);
snprintf(path, sizeof(path), "%s%svnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = strdup(path);
snprintf(path, sizeof(path), "%s%svnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = tstrdup(path);
snprintf(path, sizeof(path), "%s%sdnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.dnode = strdup(path);
snprintf(path, sizeof(path), "%s%sdnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.dnode = tstrdup(path);
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) {
dError("failed to malloc dir object");
......@@ -116,6 +116,7 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
return -1;
}
memcpy(&pDnode->opt, pOption, sizeof(SDnodeOpt));
return 0;
}
......@@ -135,12 +136,12 @@ static void dndCleanupEnv(SDnode *pDnode) {
taosStopCacheRefreshWorker();
}
SDnode *dndInit(SDnodeOpt *pOptions) {
SDnode *dndInit(SDnodeOpt *pOption) {
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
SDnode *pDnode = calloc(1, sizeof(pDnode));
SDnode *pDnode = calloc(1, sizeof(SDnode));
if (pDnode == NULL) {
dError("failed to create dnode object");
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -150,7 +151,7 @@ SDnode *dndInit(SDnodeOpt *pOptions) {
dInfo("start to initialize TDengine");
dndSetStat(pDnode, DND_STAT_INIT);
if (dndInitEnv(pDnode, pOptions) != 0) {
if (dndInitEnv(pDnode, pOption) != 0) {
dError("failed to init env");
dndCleanup(pDnode);
return NULL;
......
......@@ -131,7 +131,7 @@ typedef struct SMnodeObj {
int64_t roleTime;
int64_t createdTime;
int64_t updateTime;
SDnodeObj *pDnd;
SDnodeObj *pDnode;
} SMnodeObj;
typedef struct {
......@@ -215,7 +215,7 @@ typedef struct SDbObj {
typedef struct {
int32_t dnodeId;
int8_t role;
SDnodeObj *pDnd;
SDnodeObj *pDnode;
} SVnodeGid;
typedef struct SVgObj {
......
......@@ -32,7 +32,7 @@ typedef struct SMnodeBak {
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
SMnodeOptions para;
SMnodeOpt para;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
} SMnodeBak;
......
......@@ -53,7 +53,7 @@ void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) {
void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) {
assert(pMnode);
(*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg, forShell);
(*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg);
}
static int32_t mnodeInitTimer() {
......@@ -77,17 +77,17 @@ static void mnodeCleanupTimer() {
tmr_h mnodeGetTimer() { return tsMint.timer; }
static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) {
pMnode->dnodeId = pOptions->dnodeId;
pMnode->clusterId = pOptions->clusterId;
pMnode->replica = pOptions->replica;
pMnode->selfIndex = pOptions->selfIndex;
memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pServer = pOptions->pDnode;
pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOptions->sendRedirectMsgFp;
static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOption->clusterId;
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pServer = pOption->pDnode;
pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
......@@ -136,10 +136,10 @@ static int32_t mnodeAllocStartSteps() {
return 0;
}
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) {
SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption) {
SMnode *pMnode = calloc(1, sizeof(SMnode));
if (mnodeSetOptions(pMnode, pOptions) != 0) {
if (mnodeSetOptions(pMnode, pOption) != 0) {
free(pMnode);
mError("failed to init mnode options since %s", terrstr());
return NULL;
......@@ -173,7 +173,7 @@ SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) {
void mnodeClose(SMnode *pMnode) { free(pMnode); }
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions) { return 0; }
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; }
void mnodeDestroy(const char *path) { sdbUnDeploy(); }
......
......@@ -21,16 +21,5 @@ int32_t mnodeInitAuth() { return 0; }
void mnodeCleanupAuth() {}
int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, TSDB_NETTEST_USER) == 0) {
char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass);
*spi = 0;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pass, TSDB_KEY_LEN);
mDebug("nettest user is authorized");
return 0;
}
return 0;
}
\ No newline at end of file
......@@ -17,9 +17,6 @@
#include "vnodeInt.h"
#include "tqueue.h"
int32_t vnodeInit(SVnodePara para) { return 0; }
void vnodeCleanup() {}
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
void vnodeDrop(SVnode *pVnode) {}
......@@ -31,7 +28,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} else {
pMsg->allocNum = msgNum;
......@@ -41,7 +38,7 @@ SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
if (pMsg->curNum >= pMsg->allocNum) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册