提交 791c6c84 编写于 作者: S Shengliang Guan

adjust status msg

上级 c1605db6
......@@ -645,11 +645,10 @@ typedef struct {
int32_t sver;
int32_t dnodeId;
int32_t clusterId;
int64_t rebootTime; // time stamp for last reboot
int64_t rebootTime;
int64_t updateTime;
int16_t numOfCores;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes;
char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads;
......
......@@ -28,9 +28,7 @@ typedef struct SDnode SDnode;
typedef struct {
int32_t sver;
int16_t numOfCores;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes;
int8_t enableTelem;
int32_t statusInterval;
float numOfThreadsPerCore;
......
......@@ -56,7 +56,7 @@ typedef struct SMnodeCfg {
typedef struct {
int32_t dnodeId;
int32_t clusterId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
......
......@@ -139,9 +139,7 @@ void dmnWaitSignal() {
void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = 30000000; //3.0.0.0
pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1;
pOption->numOfSupportQnodes = 1;
pOption->statusInterval = tsStatusInterval;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
pOption->ratioOfQueryCores = tsRatioOfQueryCores;
......
......@@ -27,7 +27,7 @@ void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnode);
int32_t dndGetClusterId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
......
......@@ -22,10 +22,11 @@ extern "C" {
#include "cJSON.h"
#include "os.h"
#include "tmsg.h"
#include "tep.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#include "tqueue.h"
#include "trpc.h"
#include "tthread.h"
......@@ -51,14 +52,18 @@ typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef struct {
char *dnode;
char *mnode;
char *qnode;
char *snode;
char *bnode;
char *vnodes;
} SDnodeDir;
typedef struct {
int32_t dnodeId;
int32_t dropped;
int32_t clusterId;
int64_t clusterId;
int64_t rebootTime;
int64_t updateTime;
int8_t statusSent;
SEpSet mnodeEpSet;
char *file;
......
......@@ -17,7 +17,6 @@
#include "dndDnode.h"
#include "dndTransport.h"
#include "dndVnodes.h"
#include "tep.h"
int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
......@@ -27,10 +26,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) {
return dnodeId;
}
int32_t dndGetClusterId(SDnode *pDnode) {
int64_t dndGetClusterId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
int32_t clusterId = pMgmt->clusterId;
int64_t clusterId = pMgmt->clusterId;
taosRUnLockLatch(&pMgmt->latch);
return clusterId;
}
......@@ -68,7 +67,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, msg:%s is redirected, num:%d inUse:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) {
......@@ -82,7 +81,7 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
}
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch);
......@@ -165,7 +164,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 30000;
int32_t maxLen = 256 *1024;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
......@@ -198,11 +197,11 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
pMgmt->dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_Number) {
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pMgmt->clusterId = clusterId->valueint;
pMgmt->clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
......@@ -217,20 +216,20 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
goto PRASE_DNODE_OVER;
}
int32_t numOfNodes = cJSON_GetArraySize(dnodes);
if (numOfNodes <= 0) {
dError("failed to read %s since numOfNodes:%d invalid", pMgmt->file, numOfNodes);
int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
if (numOfDnodes <= 0) {
dError("failed to read %s since numOfDnodes:%d invalid", pMgmt->file, numOfDnodes);
goto PRASE_DNODE_OVER;
}
pMgmt->dnodeEps = calloc(1, numOfNodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
pMgmt->dnodeEps = calloc(1, numOfDnodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (pMgmt->dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_DNODE_OVER;
}
pMgmt->dnodeEps->num = numOfNodes;
pMgmt->dnodeEps->num = numOfDnodes;
for (int32_t i = 0; i < numOfNodes; ++i) {
for (int32_t i = 0; i < numOfDnodes; ++i) {
cJSON *node = cJSON_GetArrayItem(dnodes, i);
if (node == NULL) break;
......@@ -238,28 +237,28 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
cJSON *dnodeId = cJSON_GetObjectItem(node, "id");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", pMgmt->file);
dError("failed to read %s since dnodeId not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pDnodeEp->id = dnodeId->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", pMgmt->file);
dError("failed to read %s since dnodeFqdn not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s, dnodePort not found", pMgmt->file);
dError("failed to read %s since dnodePort not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pDnodeEp->port = dnodePort->valueint;
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
if (!isMnode || isMnode->type != cJSON_Number) {
dError("failed to read %s, isMnode not found", pMgmt->file);
dError("failed to read %s since isMnode not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pDnodeEp->isMnode = isMnode->valueint;
......@@ -282,7 +281,7 @@ PRASE_DNODE_OVER:
if (pMgmt->dnodeEps == NULL) {
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
pMgmt->dnodeEps->num = 1;
pMgmt->dnodeEps->eps[0].isMnode = 1;
pMgmt->dnodeEps->eps[0].isMnode = 1;
taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
}
......@@ -303,7 +302,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
}
int32_t len = 0;
int32_t maxLen = 30000;
int32_t maxLen = 256 *1024;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
......@@ -331,6 +330,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
free(content);
terrno = 0;
pMgmt->updateTime = taosGetTimestampMs();
dInfo("successed to write %s", pMgmt->file);
return 0;
}
......@@ -350,10 +350,9 @@ void dndSendStatusMsg(SDnode *pDnode) {
pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htonl(pMgmt->clusterId);
pStatus->rebootTime = htobe64(pMgmt->rebootTime);
pStatus->updateTime = htobe64(pMgmt->updateTime);
pStatus->numOfCores = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportQnodes = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfSupportVnodes);
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
......
......@@ -26,9 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p
SDnodeOpt option = {0};
option.sver = 1;
option.numOfCores = 1;
option.numOfSupportMnodes = 1;
option.numOfSupportVnodes = 1;
option.numOfSupportQnodes = 1;
option.statusInterval = 1;
option.numOfThreadsPerCore = 1;
option.ratioOfQueryCores = 1;
......
......@@ -278,10 +278,9 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htonl(pStatus->clusterId);
pStatus->rebootTime = htobe64(pStatus->rebootTime);
pStatus->updateTime = htobe64(pStatus->updateTime);
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes);
pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes);
pStatus->numOfSupportQnodes = htons(pStatus->numOfSupportQnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
}
......@@ -356,9 +355,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pDnode->rebootTime = pStatus->rebootTime;
pDnode->numOfCores = pStatus->numOfCores;
pDnode->numOfSupportMnodes = pStatus->numOfSupportMnodes;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
pDnode->numOfSupportQnodes = pStatus->numOfSupportQnodes;
pDnode->lastAccessTime = taosGetTimestampMs();
pDnode->status = DND_STATUS_READY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册