未验证 提交 7c5fdeeb 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8134 from taosdata/feature/dnode3

rename some variables
......@@ -118,7 +118,7 @@ typedef struct {
int32_t mnodeGetStatistics(SMnodeStat *stat);
/**
* Get the statistical information of Mnode.
* Get the auth information.
*
* @param user, username.
* @param spi, security parameter index.
......
......@@ -21,21 +21,21 @@ extern "C" {
#endif
#include "dnodeInt.h"
typedef struct DnCfg {
typedef struct SDnCfg {
int32_t dnodeId;
int32_t dropped;
char clusterId[TSDB_CLUSTER_ID_LEN];
char file[PATH_MAX + 20];
pthread_mutex_t mutex;
} DnCfg;
} SDnCfg;
int32_t dnodeInitCfg(DnCfg **cfg);
void dnodeCleanupCfg(DnCfg **cfg);
void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data);
int32_t dnodeGetDnodeId(DnCfg *cfg);
void dnodeGetClusterId(DnCfg *cfg, char *clusterId);
void dnodeGetCfg(DnCfg *cfg, int32_t *dnodeId, char *clusterId);
void dnodeSetDropped(DnCfg *cfg);
int32_t dnodeInitCfg(SDnCfg **cfg);
void dnodeCleanupCfg(SDnCfg **cfg);
void dnodeUpdateCfg(SDnCfg *cfg, SDnodeCfg *data);
int32_t dnodeGetDnodeId(SDnCfg *cfg);
void dnodeGetClusterId(SDnCfg *cfg, char *clusterId);
void dnodeGetCfg(SDnCfg *cfg, int32_t *dnodeId, char *clusterId);
void dnodeSetDropped(SDnCfg *cfg);
#ifdef __cplusplus
}
......
......@@ -21,11 +21,11 @@ extern "C" {
#endif
#include "dnodeInt.h"
typedef struct DnCheck {
} DnCheck;
typedef struct SDnCheck {
} SDnCheck;
int32_t dnodeInitCheck(DnCheck **check);
void dnodeCleanupCheck(DnCheck **check);
int32_t dnodeInitCheck(SDnCheck **check);
void dnodeCleanupCheck(SDnCheck **check);
#ifdef __cplusplus
}
......
......@@ -22,19 +22,19 @@ extern "C" {
#include "hash.h"
#include "dnodeInt.h"
typedef struct DnEps {
typedef struct SDnEps {
int32_t dnodeId;
int32_t dnodeNum;
SDnodeEp * dnodeList;
SHashObj * dnodeHash;
char file[PATH_MAX + 20];
pthread_mutex_t mutex;
} DnEps;
} SDnEps;
int32_t dnodeInitEps(DnEps **eps);
void dnodeCleanupEps(DnEps **eps);
void dnodeUpdateEps(DnEps *eps, SDnodeEps *data);
bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr);
int32_t dnodeInitEps(SDnEps **eps);
void dnodeCleanupEps(SDnEps **eps);
void dnodeUpdateEps(SDnEps *eps, SDnodeEps *data);
bool dnodeIsDnodeEpChanged(SDnEps *eps, int32_t dnodeId, char *epstr);
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
#ifdef __cplusplus
......
......@@ -27,32 +27,28 @@ extern "C" {
#include "tstep.h"
#include "dnode.h"
struct DnCfg;
struct DnCheck;
struct DnEps;
struct DnMnEps;
struct DnStatus;
struct DnTelem;
struct DnTrans;
struct DnMain;
struct Mnode;
struct Vnode;
struct SDnCfg;
struct SDnCheck;
struct SDnEps;
struct SDnMnEps;
struct SDnStatus;
struct SDnTelem;
struct SDnTrans;
struct SDnMain;
typedef struct Dnode {
struct SSteps * steps;
struct DnCfg * cfg;
struct DnCheck * check;
struct DnEps * eps;
struct DnMnEps * meps;
struct DnStatus *status;
struct DnTelem * telem;
struct DnTrans * trans;
struct DnMain * main;
struct Mnode * mnode;
struct Vnode * vnode;
} Dnode;
typedef struct SDnode {
struct SSteps* steps;
struct SDnCfg* cfg;
struct SDnCheck* check;
struct SDnEps* eps;
struct SDnMnEps* meps;
struct SDnStatus* status;
struct SDnTelem* telem;
struct SDnTrans* trans;
struct SDnMain* main;
} SDnode;
Dnode* dnodeInst();
SDnode* dnodeInst();
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }}
......
......@@ -27,14 +27,14 @@ typedef enum {
TD_RUN_STAT_STOPPED
} RunStat;
typedef struct DnMain {
typedef struct SDnMain {
RunStat runStatus;
void * dnodeTimer;
SStartupStep startup;
} DnMain;
} SDnMain;
int32_t dnodeInitMain(DnMain **main);
void dnodeCleanupMain(DnMain **main);
int32_t dnodeInitMain(SDnMain **main);
void dnodeCleanupMain(SDnMain **main);
int32_t dnodeInitStorage();
void dnodeCleanupStorage();
void dnodeReportStartup(char *name, char *desc);
......
......@@ -21,19 +21,19 @@ extern "C" {
#endif
#include "dnodeInt.h"
typedef struct DnMnEps {
typedef struct SDnMnEps {
SRpcEpSet mnodeEpSet;
SMInfos mnodeInfos;
char file[PATH_MAX + 20];
pthread_mutex_t mutex;
} DnMnEps;
} SDnMnEps;
int32_t dnodeInitMnodeEps(DnMnEps **meps);
void dnodeCleanupMnodeEps(DnMnEps **meps);
void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *pMinfos);
void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *pEpSet);
void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet);
void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet);
int32_t dnodeInitMnodeEps(SDnMnEps **meps);
void dnodeCleanupMnodeEps(SDnMnEps **meps);
void dnodeUpdateMnodeFromStatus(SDnMnEps *meps, SMInfos *pMinfos);
void dnodeUpdateMnodeFromPeer(SDnMnEps *meps, SRpcEpSet *pEpSet);
void dnodeGetEpSetForPeer(SDnMnEps *meps, SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SDnMnEps *meps, SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus
......
......@@ -21,14 +21,14 @@ extern "C" {
#endif
#include "dnodeInt.h"
typedef struct DnStatus {
typedef struct SDnStatus {
void * dnodeTimer;
void * statusTimer;
uint32_t rebootTime;
} DnStatus;
} SDnStatus;
int32_t dnodeInitStatus(DnStatus **status);
void dnodeCleanupStatus(DnStatus **status);
int32_t dnodeInitStatus(SDnStatus **status);
void dnodeCleanupStatus(SDnStatus **status);
void dnodeProcessStatusRsp(SRpcMsg *pMsg);
#ifdef __cplusplus
......
......@@ -25,17 +25,17 @@ extern "C" {
* sem_timedwait is NOT implemented on MacOSX
* thus we use pthread_mutex_t/pthread_cond_t to simulate
*/
typedef struct DnTelem {
typedef struct SDnTelem {
bool enable;
pthread_mutex_t lock;
pthread_cond_t cond;
volatile int32_t exit;
pthread_t thread;
char email[TSDB_FQDN_LEN];
} DnTelem;
} SDnTelem;
int32_t dnodeInitTelem(DnTelem **telem);
void dnodeCleanupTelem(DnTelem **telem);
int32_t dnodeInitTelem(SDnTelem **telem);
void dnodeCleanupTelem(SDnTelem **telem);
#ifdef __cplusplus
}
......
......@@ -23,7 +23,7 @@ extern "C" {
typedef void (*RpcMsgFp)( SRpcMsg *pMsg);
typedef struct DnTrans {
typedef struct SDnTrans {
void * serverRpc;
void * clientRpc;
void * shellRpc;
......@@ -31,11 +31,10 @@ typedef struct DnTrans {
int32_t submitReqNum;
RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX];
RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX];
} SDnTrans;
} DnTrans;
int32_t dnodeInitTrans(DnTrans **rans);
void dnodeCleanupTrans(DnTrans **trans);
int32_t dnodeInitTrans(SDnTrans **rans);
void dnodeCleanupTrans(SDnTrans **trans);
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
......
......@@ -18,7 +18,7 @@
#include "cJSON.h"
#include "dnodeCfg.h"
static int32_t dnodeReadCfg(DnCfg *cfg) {
static int32_t dnodeReadCfg(SDnCfg *cfg) {
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
......@@ -76,7 +76,7 @@ PARSE_CFG_OVER:
return 0;
}
static int32_t dnodeWriteCfg(DnCfg *cfg) {
static int32_t dnodeWriteCfg(SDnCfg *cfg) {
FILE *fp = fopen(cfg->file, "w");
if (!fp) {
dError("failed to write %s since %s", cfg->file, strerror(errno));
......@@ -103,8 +103,8 @@ static int32_t dnodeWriteCfg(DnCfg *cfg) {
return 0;
}
int32_t dnodeInitCfg(DnCfg **out) {
DnCfg* cfg = calloc(1, sizeof(DnCfg));
int32_t dnodeInitCfg(SDnCfg **out) {
SDnCfg* cfg = calloc(1, sizeof(SDnCfg));
if (cfg == NULL) return -1;
cfg->dnodeId = 0;
......@@ -127,15 +127,15 @@ int32_t dnodeInitCfg(DnCfg **out) {
return ret;
}
void dnodeCleanupCfg(DnCfg **out) {
DnCfg* cfg = *out;
void dnodeCleanupCfg(SDnCfg **out) {
SDnCfg* cfg = *out;
*out = NULL;
pthread_mutex_destroy(&cfg->mutex);
free(cfg);
}
void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data) {
void dnodeUpdateCfg(SDnCfg *cfg, SDnodeCfg *data) {
if (cfg == NULL || cfg->dnodeId == 0) return;
pthread_mutex_lock(&cfg->mutex);
......@@ -148,14 +148,14 @@ void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data) {
pthread_mutex_unlock(&cfg->mutex);
}
void dnodeSetDropped(DnCfg *cfg) {
void dnodeSetDropped(SDnCfg *cfg) {
pthread_mutex_lock(&cfg->mutex);
cfg->dropped = 1;
dnodeWriteCfg(cfg);
pthread_mutex_unlock(&cfg->mutex);
}
int32_t dnodeGetDnodeId(DnCfg *cfg) {
int32_t dnodeGetDnodeId(SDnCfg *cfg) {
int32_t dnodeId = 0;
pthread_mutex_lock(&cfg->mutex);
dnodeId = cfg->dnodeId;
......@@ -163,13 +163,13 @@ int32_t dnodeGetDnodeId(DnCfg *cfg) {
return dnodeId;
}
void dnodeGetClusterId(DnCfg *cfg, char *clusterId) {
void dnodeGetClusterId(SDnCfg *cfg, char *clusterId) {
pthread_mutex_lock(&cfg->mutex);
tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
pthread_mutex_unlock(&cfg->mutex);
}
void dnodeGetCfg(DnCfg *cfg, int32_t *dnodeId, char *clusterId) {
void dnodeGetCfg(SDnCfg *cfg, int32_t *dnodeId, char *clusterId) {
pthread_mutex_lock(&cfg->mutex);
*dnodeId = cfg->dnodeId;
tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
......
......@@ -145,8 +145,8 @@ static int32_t dnodeCheckAccess() { return 0; }
static int32_t dnodeCheckVersion() { return 0; }
static int32_t dnodeCheckDatafile() { return 0; }
int32_t dnodeInitCheck(DnCheck **out) {
DnCheck *check = calloc(1, sizeof(DnCheck));
int32_t dnodeInitCheck(SDnCheck **out) {
SDnCheck *check = calloc(1, sizeof(SDnCheck));
if (check == NULL) return -1;
*out = check;
......@@ -195,8 +195,8 @@ int32_t dnodeInitCheck(DnCheck **out) {
return 0;
}
void dnodeCleanupCheck(DnCheck **out) {
DnCheck *check = *out;
void dnodeCleanupCheck(SDnCheck **out) {
SDnCheck *check = *out;
*out = NULL;
free(check);
......
......@@ -20,7 +20,7 @@
#include "dnodeEps.h"
#include "dnodeCfg.h"
static void dnodePrintEps(DnEps *eps) {
static void dnodePrintEps(SDnEps *eps) {
dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; i++) {
SDnodeEp *ep = &eps->dnodeList[i];
......@@ -28,7 +28,7 @@ static void dnodePrintEps(DnEps *eps) {
}
}
static void dnodeResetEps(DnEps *eps, SDnodeEps *data) {
static void dnodeResetEps(SDnEps *eps, SDnodeEps *data) {
assert(data != NULL);
if (data->dnodeNum > eps->dnodeNum) {
......@@ -48,7 +48,7 @@ static void dnodeResetEps(DnEps *eps, SDnodeEps *data) {
}
}
static int32_t dnodeReadEps(DnEps *eps) {
static int32_t dnodeReadEps(SDnEps *eps) {
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
......@@ -145,7 +145,7 @@ PRASE_EPS_OVER:
return 0;
}
static int32_t dnodeWriteEps(DnEps *eps) {
static int32_t dnodeWriteEps(SDnEps *eps) {
FILE *fp = fopen(eps->file, "w");
if (!fp) {
dError("failed to write %s since %s", eps->file, strerror(errno));
......@@ -182,8 +182,8 @@ static int32_t dnodeWriteEps(DnEps *eps) {
return 0;
}
int32_t dnodeInitEps(DnEps **out) {
DnEps *eps = calloc(1, sizeof(DnEps));
int32_t dnodeInitEps(SDnEps **out) {
SDnEps *eps = calloc(1, sizeof(SDnEps));
if (eps == NULL) return -1;
eps->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
......@@ -203,8 +203,8 @@ int32_t dnodeInitEps(DnEps **out) {
return ret;
}
void dnodeCleanupEps(DnEps **out) {
DnEps *eps = *out;
void dnodeCleanupEps(SDnEps **out) {
SDnEps *eps = *out;
*out = NULL;
pthread_mutex_lock(&eps->mutex);
......@@ -225,7 +225,7 @@ void dnodeCleanupEps(DnEps **out) {
free(eps);
}
void dnodeUpdateEps(DnEps *eps, SDnodeEps *data) {
void dnodeUpdateEps(SDnEps *eps, SDnodeEps *data) {
if (data == NULL || data->dnodeNum <= 0) return;
data->dnodeNum = htonl(data->dnodeNum);
......@@ -250,7 +250,7 @@ void dnodeUpdateEps(DnEps *eps, SDnodeEps *data) {
pthread_mutex_unlock(&eps->mutex);
}
bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr) {
bool dnodeIsDnodeEpChanged(SDnEps *eps, int32_t dnodeId, char *epstr) {
bool changed = false;
pthread_mutex_lock(&eps->mutex);
......@@ -269,7 +269,7 @@ bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr) {
}
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
DnEps *eps = dnodeInst()->eps;
SDnEps *eps = dnodeInst()->eps;
pthread_mutex_lock(&eps->mutex);
SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t));
......
......@@ -33,9 +33,10 @@
#include "mnode.h"
#include "vnode.h"
static Dnode tsDnode = {0};
Dnode *dnodeInst() { return &tsDnode; }
SDnode *dnodeInst() {
static SDnode inst = {0};
return &inst;
}
static int32_t dnodeInitVnodeModule(void **unused) {
SVnodePara para;
......@@ -47,7 +48,7 @@ static int32_t dnodeInitVnodeModule(void **unused) {
}
static int32_t dnodeInitMnodeModule(void **unused) {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
SMnodePara para;
para.fp.GetDnodeEp = dnodeGetDnodeEp;
......@@ -64,7 +65,7 @@ int32_t dnodeInit() {
struct SSteps *steps = taosStepInit(24, dnodeReportStartup);
if (steps == NULL) return -1;
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
taosStepAdd(steps, "dnode-main", (void **)&dnode->main, (InitFp)dnodeInitMain, (CleanupFp)dnodeCleanupMain);
taosStepAdd(steps, "dnode-storage", NULL, (InitFp)dnodeInitStorage, (CleanupFp)dnodeCleanupStorage);
......@@ -96,7 +97,7 @@ int32_t dnodeInit() {
}
void dnodeCleanup() {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
if (dnode->main->runStatus != TD_RUN_STAT_STOPPED) {
dnode->main->runStatus = TD_RUN_STAT_STOPPED;
taosStepCleanup(dnode->steps);
......
......@@ -55,8 +55,8 @@ void dnodePrintDiskInfo() {
dInfo("==================================");
}
int32_t dnodeInitMain(DnMain **out) {
DnMain* main = calloc(1, sizeof(DnMain));
int32_t dnodeInitMain(SDnMain **out) {
SDnMain* main = calloc(1, sizeof(SDnMain));
if (main == NULL) return -1;
main->runStatus = TD_RUN_STAT_STOPPED;
......@@ -101,8 +101,8 @@ int32_t dnodeInitMain(DnMain **out) {
return taosCheckGlobalCfg();
}
void dnodeCleanupMain(DnMain **out) {
DnMain *main = *out;
void dnodeCleanupMain(SDnMain **out) {
SDnMain *main = *out;
*out = NULL;
if (main->dnodeTimer != NULL) {
......@@ -202,7 +202,7 @@ void dnodeCleanupStorage() {
}
void dnodeReportStartup(char *name, char *desc) {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
if (dnode->main != NULL) {
SStartupStep *startup = &dnode->main->startup;
tstrncpy(startup->name, name, strlen(startup->name));
......@@ -212,7 +212,7 @@ void dnodeReportStartup(char *name, char *desc) {
}
void dnodeReportStartupFinished(char *name, char *desc) {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
SStartupStep *startup = &dnode->main->startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
......@@ -222,7 +222,7 @@ void dnodeReportStartupFinished(char *name, char *desc) {
void dnodeProcessStartupReq(SRpcMsg *pMsg) {
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep));
......@@ -234,7 +234,7 @@ void dnodeProcessStartupReq(SRpcMsg *pMsg) {
}
static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnode->cfg->dnodeId) {
......@@ -254,7 +254,7 @@ static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
}
if (mnodeIsServing(dnode->mnode)) return 0;
if (mnodeIsServing()) return 0;
return mnodeDeploy(&pCfg->mnodes);
}
......
......@@ -22,7 +22,7 @@
#include "dnodeMnodeEps.h"
#include "mnode.h"
static void dnodePrintMnodeEps(DnMnEps *meps) {
static void dnodePrintMnodeEps(SDnMnEps *meps) {
SRpcEpSet *epset = &meps->mnodeEpSet;
dInfo("print mnode eps, num:%d inuse:%d", epset->numOfEps, epset->inUse);
for (int32_t i = 0; i < epset->numOfEps; i++) {
......@@ -30,7 +30,7 @@ static void dnodePrintMnodeEps(DnMnEps *meps) {
}
}
static void dnodeResetMnodeEps(DnMnEps *meps, SMInfos *mInfos) {
static void dnodeResetMnodeEps(SDnMnEps *meps, SMInfos *mInfos) {
if (mInfos == NULL || mInfos->mnodeNum == 0) {
meps->mnodeEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, meps->mnodeEpSet.fqdn[0], &meps->mnodeEpSet.port[0]);
......@@ -55,7 +55,7 @@ static void dnodeResetMnodeEps(DnMnEps *meps, SMInfos *mInfos) {
dnodePrintMnodeEps(meps);
}
static int32_t dnodeWriteMnodeEps(DnMnEps *meps) {
static int32_t dnodeWriteMnodeEps(SDnMnEps *meps) {
FILE *fp = fopen(meps->file, "w");
if (!fp) {
dError("failed to write %s since %s", meps->file, strerror(errno));
......@@ -91,7 +91,7 @@ static int32_t dnodeWriteMnodeEps(DnMnEps *meps) {
return 0;
}
static int32_t dnodeReadMnodeEps(DnMnEps *meps, DnEps *deps) {
static int32_t dnodeReadMnodeEps(SDnMnEps *meps, SDnEps *deps) {
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
......@@ -192,7 +192,7 @@ PARSE_MINFOS_OVER:
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
DnMnEps *meps = dnodeInst()->meps;
SDnMnEps *meps = dnodeInst()->meps;
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
......@@ -222,8 +222,8 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}
int32_t dnodeInitMnodeEps(DnMnEps **out) {
DnMnEps *meps = calloc(1, sizeof(DnMnEps));
int32_t dnodeInitMnodeEps(SDnMnEps **out) {
SDnMnEps *meps = calloc(1, sizeof(SDnMnEps));
if (meps == NULL) return -1;
snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir);
......@@ -239,8 +239,8 @@ int32_t dnodeInitMnodeEps(DnMnEps **out) {
return ret;
}
void dnodeCleanupMnodeEps(DnMnEps **out) {
DnMnEps *meps = *out;
void dnodeCleanupMnodeEps(SDnMnEps **out) {
SDnMnEps *meps = *out;
*out = NULL;
if (meps != NULL) {
......@@ -249,7 +249,7 @@ void dnodeCleanupMnodeEps(DnMnEps **out) {
}
}
void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *mInfos) {
void dnodeUpdateMnodeFromStatus(SDnMnEps *meps, SMInfos *mInfos) {
if (mInfos->mnodeNum <= 0 || mInfos->mnodeNum > TSDB_MAX_REPLICA) {
dError("invalid mInfos since num:%d invalid", mInfos->mnodeNum);
return;
......@@ -278,7 +278,7 @@ void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *mInfos) {
pthread_mutex_unlock(&meps->mutex);
}
void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *ep) {
void dnodeUpdateMnodeFromPeer(SDnMnEps *meps, SRpcEpSet *ep) {
if (ep->numOfEps <= 0) {
dError("mInfos is changed, but content is invalid, discard it");
return;
......@@ -296,7 +296,7 @@ void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *ep) {
pthread_mutex_unlock(&meps->mutex);
}
void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet) {
void dnodeGetEpSetForPeer(SDnMnEps *meps, SRpcEpSet *epSet) {
pthread_mutex_lock(&meps->mutex);
*epSet = meps->mnodeEpSet;
......@@ -307,7 +307,7 @@ void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet) {
pthread_mutex_unlock(&meps->mutex);
}
void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet) {
void dnodeGetEpSetForShell(SDnMnEps *meps, SRpcEpSet *epSet) {
pthread_mutex_lock(&meps->mutex);
*epSet = meps->mnodeEpSet;
......
......@@ -26,7 +26,7 @@
#include "vnode.h"
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
DnStatus *status = handle;
SDnStatus *status = handle;
if (status->dnodeTimer == NULL) {
dError("dnode timer is already released");
return;
......@@ -46,7 +46,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return;
}
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
dnodeGetCfg(dnode->cfg, &pStatus->dnodeId, pStatus->clusterId);
pStatus->dnodeId = htonl(dnodeGetDnodeId(dnode->cfg));
pStatus->version = htonl(tsVersion);
......@@ -86,8 +86,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
}
void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
Dnode *dnode = dnodeInst();
DnStatus *status = pMsg->ahandle;
SDnode *dnode = dnodeInst();
SDnStatus *status = pMsg->ahandle;
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
......@@ -123,8 +123,8 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer);
}
int32_t dnodeInitStatus(DnStatus **out) {
DnStatus *status = calloc(1, sizeof(DnStatus));
int32_t dnodeInitStatus(SDnStatus **out) {
SDnStatus *status = calloc(1, sizeof(SDnStatus));
if (status == NULL) return -1;
status->statusTimer = NULL;
status->dnodeTimer = dnodeInst()->main->dnodeTimer;
......@@ -135,8 +135,8 @@ int32_t dnodeInitStatus(DnStatus **out) {
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupStatus(DnStatus **out) {
DnStatus *status = *out;
void dnodeCleanupStatus(SDnStatus **out) {
SDnStatus *status = *out;
*out = NULL;
if (status->statusTimer != NULL) {
......
......@@ -154,14 +154,14 @@ static void dnodeAddMemoryInfo(SBufferWriter* bw) {
fclose(fp);
}
static void dnodeAddVersionInfo(DnTelem* telem, SBufferWriter* bw) {
static void dnodeAddVersionInfo(SDnTelem* telem, SBufferWriter* bw) {
dnodeAddStringField(bw, "version", version);
dnodeAddStringField(bw, "buildInfo", buildinfo);
dnodeAddStringField(bw, "gitInfo", gitinfo);
dnodeAddStringField(bw, "email", telem->email);
}
static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) {
static void dnodeAddRuntimeInfo(SDnTelem* telem, SBufferWriter* bw) {
SMnodeStat stat = {0};
if (mnodeGetStatistics(&stat) != 0) {
return;
......@@ -179,7 +179,7 @@ static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) {
dnodeAddIntField(bw, "compStorage", stat.compStorage);
}
static void dnodeSendTelemetryReport(DnTelem* telem) {
static void dnodeSendTelemetryReport(SDnTelem* telem) {
char buf[128] = {0};
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) {
......@@ -192,7 +192,7 @@ static void dnodeSendTelemetryReport(DnTelem* telem) {
return;
}
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
SBufferWriter bw = tbufInitWriter(NULL, false);
dnodeBeginObject(&bw);
dnodeAddStringField(&bw, "instanceId", dnode->cfg->clusterId);
......@@ -227,7 +227,7 @@ static void dnodeSendTelemetryReport(DnTelem* telem) {
}
static void* dnodeTelemThreadFp(void* param) {
DnTelem* telem = param;
SDnTelem* telem = param;
struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end);
......@@ -253,7 +253,7 @@ static void* dnodeTelemThreadFp(void* param) {
return NULL;
}
static void dnodeGetEmail(DnTelem* telem, char* filepath) {
static void dnodeGetEmail(SDnTelem* telem, char* filepath) {
int32_t fd = taosOpenFileRead(filepath);
if (fd < 0) {
return;
......@@ -266,8 +266,8 @@ static void dnodeGetEmail(DnTelem* telem, char* filepath) {
taosCloseFile(fd);
}
int32_t dnodeInitTelem(DnTelem** out) {
DnTelem* telem = calloc(1, sizeof(DnTelem));
int32_t dnodeInitTelem(SDnTelem** out) {
SDnTelem* telem = calloc(1, sizeof(SDnTelem));
if (telem == NULL) return -1;
telem->enable = tsEnableTelemetryReporting;
......@@ -296,8 +296,8 @@ int32_t dnodeInitTelem(DnTelem** out) {
return 0;
}
void dnodeCleanupTelem(DnTelem** out) {
DnTelem* telem = *out;
void dnodeCleanupTelem(SDnTelem** out) {
SDnTelem* telem = *out;
*out = NULL;
if (!telem->enable) {
......
......@@ -14,7 +14,7 @@
*/
/* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config
* dnode works as both server and client. SDnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here
*/
......@@ -30,7 +30,7 @@
#include "mnode.h"
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode * dnode = dnodeInst();
SDnode * dnode = dnodeInst();
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return;
......@@ -64,7 +64,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
}
int32_t dnodeInitServer(DnTrans *trans) {
int32_t dnodeInitServer(SDnTrans *trans) {
trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg;
......@@ -106,7 +106,7 @@ int32_t dnodeInitServer(DnTrans *trans) {
return 0;
}
void dnodeCleanupServer(DnTrans *trans) {
void dnodeCleanupServer(SDnTrans *trans) {
if (trans->serverRpc) {
rpcClose(trans->serverRpc);
trans->serverRpc = NULL;
......@@ -115,7 +115,7 @@ void dnodeCleanupServer(DnTrans *trans) {
}
static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("msg:%p is ignored since dnode is stopping", pMsg);
......@@ -141,7 +141,7 @@ static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
rpcFreeCont(pMsg->pCont);
}
int32_t dnodeInitClient(DnTrans *trans) {
int32_t dnodeInitClient(SDnTrans *trans) {
trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg;
......@@ -186,7 +186,7 @@ int32_t dnodeInitClient(DnTrans *trans) {
return 0;
}
void dnodeCleanupClient(DnTrans *trans) {
void dnodeCleanupClient(SDnTrans *trans) {
if (trans->clientRpc) {
rpcClose(trans->clientRpc);
trans->clientRpc = NULL;
......@@ -195,7 +195,7 @@ void dnodeCleanupClient(DnTrans *trans) {
}
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode * dnode = dnodeInst();
SDnode * dnode = dnodeInst();
SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return;
......@@ -213,7 +213,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return;
}
DnTrans *trans = dnode->trans;
SDnTrans *trans = dnode->trans;
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&trans->queryReqNum, 1);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
......@@ -247,26 +247,26 @@ static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secr
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL);
}
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
Dnode * dnode = dnodeInst();
SDnode * dnode = dnodeInst();
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(dnode->meps, &epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg);
}
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
Dnode * dnode = dnodeInst();
SDnode * dnode = dnodeInst();
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(dnode->meps, &epSet);
rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp);
}
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
Dnode *dnode = dnodeInst();
SDnode *dnode = dnodeInst();
rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp);
}
......@@ -303,7 +303,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
return rpcRsp.code;
}
int32_t dnodeInitShell(DnTrans *trans) {
int32_t dnodeInitShell(SDnTrans *trans) {
trans->shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
trans->shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
trans->shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
......@@ -376,15 +376,15 @@ int32_t dnodeInitShell(DnTrans *trans) {
return 0;
}
void dnodeCleanupShell(DnTrans *trans) {
void dnodeCleanupShell(SDnTrans *trans) {
if (trans->shellRpc) {
rpcClose(trans->shellRpc);
trans->shellRpc = NULL;
}
}
int32_t dnodeInitTrans(DnTrans **out) {
DnTrans *trans = calloc(1, sizeof(DnTrans));
int32_t dnodeInitTrans(SDnTrans **out) {
SDnTrans *trans = calloc(1, sizeof(SDnTrans));
if (trans == NULL) return -1;
*out = trans;
......@@ -404,8 +404,8 @@ int32_t dnodeInitTrans(DnTrans **out) {
return 0;
}
void dnodeCleanupTrans(DnTrans **out) {
DnTrans* trans = *out;
void dnodeCleanupTrans(SDnTrans **out) {
SDnTrans* trans = *out;
*out = NULL;
dnodeCleanupShell(trans);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册