提交 4662cff5 编写于 作者: S slguan

[TD-9] fix status bug in mnode

上级 0e05f4ac
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "dnode.h" #include "dnode.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeMClient.h" #include "dnodeMgmt.h"
static bool dnodeReadMnodeIpList(); static bool dnodeReadMnodeIpList();
static void dnodeSaveMnodeIpList(); static void dnodeSaveMnodeIpList();
...@@ -35,7 +35,7 @@ static SRpcIpSet tsDnodeMnodeIpList = {0}; ...@@ -35,7 +35,7 @@ static SRpcIpSet tsDnodeMnodeIpList = {0};
int32_t dnodeInitMClient() { int32_t dnodeInitMClient() {
if (!dnodeReadMnodeIpList()) { if (!dnodeReadMnodeIpList()) {
dTrace("failed to read mnode iplist, set it from cfg file"); dTrace("failed to read mnode iplist, set it from cfg file");
memset(&tsDnodeMnodeIpList, sizeof(SRpcIpSet), 0); memset(&tsDnodeMnodeIpList, 0, sizeof(SRpcIpSet));
tsDnodeMnodeIpList.port = tsMnodeDnodePort; tsDnodeMnodeIpList.port = tsMnodeDnodePort;
tsDnodeMnodeIpList.numOfIps = 1; tsDnodeMnodeIpList.numOfIps = 1;
tsDnodeMnodeIpList.ip[0] = inet_addr(tsMasterIp); tsDnodeMnodeIpList.ip[0] = inet_addr(tsMasterIp);
...@@ -106,13 +106,13 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -106,13 +106,13 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
pStatusRsp->ipList.ip[i] = htonl(pStatusRsp->ipList.ip[i]); pStatusRsp->ipList.ip[i] = htonl(pStatusRsp->ipList.ip[i]);
} }
dTrace("status msg is received, result:%d", tstrerror(pMsg->code)); //dTrace("status msg is received, result:%s", tstrerror(pMsg->code));
if (memcmp(&(pStatusRsp->ipList), &tsDnodeMnodeIpList, sizeof(SRpcIpSet)) != 0) { if (memcmp(&(pStatusRsp->ipList), &tsDnodeMnodeIpList, sizeof(SRpcIpSet)) != 0) {
dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", pStatusRsp->ipList.numOfIps, pStatusRsp->ipList.inUse); dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", pStatusRsp->ipList.numOfIps, pStatusRsp->ipList.inUse);
memcpy(&tsDnodeMnodeIpList, &pStatusRsp->ipList, sizeof(SRpcIpSet)); memcpy(&tsDnodeMnodeIpList, &pStatusRsp->ipList, sizeof(SRpcIpSet));
for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; ++i) { for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; ++i) {
dPrint("mnode IP index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i]));
} }
dnodeSaveMnodeIpList(); dnodeSaveMnodeIpList();
} }
...@@ -142,7 +142,7 @@ static bool dnodeReadMnodeIpList() { ...@@ -142,7 +142,7 @@ static bool dnodeReadMnodeIpList() {
int32_t value = 0; int32_t value = 0;
int32_t num = 0; int32_t num = 0;
fscanf(fp, "%s %d", option, &value); num = fscanf(fp, "%s %d", option, &value);
if (num != 2) return false; if (num != 2) return false;
if (strcmp(option, "inUse") != 0) return false; if (strcmp(option, "inUse") != 0) return false;
tsDnodeMnodeIpList.inUse = (int8_t)value;; tsDnodeMnodeIpList.inUse = (int8_t)value;;
...@@ -167,7 +167,7 @@ static bool dnodeReadMnodeIpList() { ...@@ -167,7 +167,7 @@ static bool dnodeReadMnodeIpList() {
fclose(fp); fclose(fp);
dPrint("read mnode iplist successed"); dPrint("read mnode iplist successed");
for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) {
dPrint("mnode IP index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i]));
} }
return true; return true;
......
...@@ -218,10 +218,10 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { ...@@ -218,10 +218,10 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
return terrno; return terrno;
} }
STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb); //STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb);
SVnodeObj vnodeObj = {0}; SVnodeObj vnodeObj = {0};
vnodeObj.vgId = tsdbInfo->tsdbCfg.tsdbId; vnodeObj.vgId = vnode;//tsdbInfo->tsdbCfg.tsdbId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
...@@ -324,7 +324,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -324,7 +324,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
dPrint("vgroup:%d, vnode is created", vnodeObj.vgId); dPrint("vgroup:%d, vnode:%d is created", vnodeObj.vgId, vnodeObj.vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -359,7 +359,7 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -359,7 +359,7 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
dTrace("vgroup:%d, start to create vnode in dnode", pCreate->cfg.vgId); dTrace("vgroup:%d, start to create vnode:%d in dnode", pCreate->cfg.vgId, pCreate->cfg.vgId);
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) { if (pVnodeObj != NULL) {
...@@ -439,6 +439,7 @@ static void dnodeBuildVloadMsg(char *pNode, void * param) { ...@@ -439,6 +439,7 @@ static void dnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId); pLoad->vgId = htonl(pVnode->vgId);
pLoad->vnode = htonl(pVnode->vgId);
pLoad->status = pVnode->status; pLoad->status = pVnode->status;
} }
...@@ -498,7 +499,7 @@ static void dnodeReadDnodeId() { ...@@ -498,7 +499,7 @@ static void dnodeReadDnodeId() {
int32_t value = 0; int32_t value = 0;
int32_t num = 0; int32_t num = 0;
fscanf(fp, "%s %d", option, &value); num = fscanf(fp, "%s %d", option, &value);
if (num != 2) return; if (num != 2) return;
if (strcmp(option, "dnodeId") != 0) return; if (strcmp(option, "dnodeId") != 0) return;
tsDnodeId = value;; tsDnodeId = value;;
......
...@@ -84,13 +84,13 @@ int32_t dnodeInitModules() { ...@@ -84,13 +84,13 @@ int32_t dnodeInitModules() {
} }
void dnodeStartModules() { void dnodeStartModules() {
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) { // for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].startFp) { // if (tsModule[mod].num != 0 && tsModule[mod].startFp) {
if ((*tsModule[mod].startFp)() != 0) { // if ((*tsModule[mod].startFp)() != 0) {
dError("failed to start module:%d", mod); // dError("failed to start module:%d", mod);
} // }
} // }
} // }
} }
void dnodeProcessModuleStatus(uint32_t moduleStatus) { void dnodeProcessModuleStatus(uint32_t moduleStatus) {
......
...@@ -39,6 +39,28 @@ extern "C" { ...@@ -39,6 +39,28 @@ extern "C" {
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
typedef struct {
int32_t mnodeId;
uint32_t privateIp;
uint32_t publicIp;
int64_t createdTime;
int64_t lostTime;
uint64_t dbVersion;
uint32_t rack;
uint16_t idc;
uint16_t slot;
int8_t role;
int8_t status;
int8_t numOfMnodes;
int32_t numOfDnodes;
char mnodeName[TSDB_DNODE_NAME_LEN + 1];
char reserved[7];
char updateEnd[1];
int syncFd;
void *hbTimer;
void *pSync;
} SMnodeObj;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
uint32_t privateIp; uint32_t privateIp;
......
...@@ -22,6 +22,8 @@ extern "C" { ...@@ -22,6 +22,8 @@ extern "C" {
bool mgmtCheckRedirect(void *handle); bool mgmtCheckRedirect(void *handle);
void mgmtGetMnodeIpList(SRpcIpSet *ipSet);
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp); int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp);
int32_t mgmtRemoveMnode(uint32_t privateIp); int32_t mgmtRemoveMnode(uint32_t privateIp);
......
...@@ -528,7 +528,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -528,7 +528,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
pDnode = mgmtGetDnodeByIp(htonl(pStatus->privateIp)); pDnode = mgmtGetDnodeByIp(htonl(pStatus->privateIp));
if (pDnode == NULL) { if (pDnode == NULL) {
mTrace("dnode not created, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName); mTrace("dnode not created, privateIp:%s", taosIpStr(htonl(pStatus->privateIp)));
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
return;
}
} else {
pDnode = mgmtGetDnode(pStatus->dnodeId);
if (pDnode == NULL) {
mError("dnode:%d, not exist, privateIp:%s", taosIpStr(pStatus->dnodeId), pStatus->dnodeName);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
return; return;
} }
...@@ -569,11 +576,11 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -569,11 +576,11 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage); pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage);
pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten); pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten);
SVgObj *pVgroup = mgmtGetVgroup(pStatus->load[j].vgId); SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
mPrint("dnode:%d, vnode:%d not exist in mnode, drop it", pDnode->dnodeId, pStatus->load[j].vgId); mPrint("dnode:%d, vnode:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pStatus->load[j].vgId, &ipSet, NULL); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
} }
} }
...@@ -590,11 +597,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -590,11 +597,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return; return;
} }
pRsp->ipList = *pSdbIpList; mgmtGetMnodeIpList(&pRsp->ipList);
pRsp->ipList.port = htons(pRsp->ipList.port);
for (int i = 0; i < pRsp->ipList.numOfIps; ++i) {
pRsp->ipList.ip[i] = htonl(pRsp->ipList.ip[i]);
}
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus);
...@@ -606,8 +609,9 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -606,8 +609,9 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
//TODO: set vnode access //TODO: set vnode access
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = rpcMsg->handle,
.code = TSDB_CODE_SUCCESS, .code = TSDB_CODE_SUCCESS,
.pCont = pStatus, .pCont = pRsp,
.contLen = contLen .contLen = contLen
}; };
......
...@@ -23,9 +23,9 @@ ...@@ -23,9 +23,9 @@
int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL; int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL;
int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL; int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL;
int32_t (*mgmtGetMnodesNumFp)() = NULL; int32_t (*mgmtGetMnodesNumFp)() = NULL;
void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SMnodeObj **pMnode) = NULL;
static SSdbPeer tsMnodeObj = {0}; static SMnodeObj tsMnodeObj = {0};
static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
...@@ -57,12 +57,12 @@ static int32_t mgmtGetMnodesNum() { ...@@ -57,12 +57,12 @@ static int32_t mgmtGetMnodesNum() {
} }
} }
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { static void *mgmtGetNextMnode(SShowObj *pShow, SMnodeObj **pMnode) {
if (mgmtGetNextMnodeFp) { if (mgmtGetNextMnodeFp) {
return (*mgmtGetNextMnodeFp)(pShow, pMnode); return (*mgmtGetNextMnodeFp)(pShow, pMnode);
} else { } else {
if (*pMnode == NULL) { if (*pMnode == NULL) {
*pMnode = NULL; *pMnode = &tsMnodeObj;
} else { } else {
*pMnode = NULL; *pMnode = NULL;
} }
...@@ -129,18 +129,19 @@ static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) ...@@ -129,18 +129,19 @@ static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn)
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;
SSdbPeer *pMnode = NULL; SMnodeObj *pMnode = NULL;
char *pWrite; char *pWrite;
char ipstr[32]; char ipstr[32];
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode); pShow->pNode = mgmtGetNextMnode(pShow, (SMnodeObj **)&pMnode);
if (pMnode == NULL) break; if (pMnode == NULL) break;
cols = 0; cols = 0;
tinet_ntoa(ipstr, pMnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pMnode->ipstr); strcpy(pWrite, ipstr);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
...@@ -167,4 +168,9 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -167,4 +168,9 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
return numOfRows; return numOfRows;
} }
void mgmtGetMnodeIpList(SRpcIpSet *ipSet) {
ipSet->inUse = 0;
ipSet->port = htons(tsMnodeDnodePort);
ipSet->numOfIps = 1;
ipSet->ip[0] = htonl(inet_addr(tsMasterIp));
}
\ No newline at end of file
...@@ -165,7 +165,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { ...@@ -165,7 +165,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("vgroup:%d, dnode:%d vnode:%d", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].dnodeId), pVgroup->vnodeGid[i].vnode); mPrint("vgroup:%d, dnode:%d vnode:%d", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode);
} }
pMsg->ahandle = pVgroup; pMsg->ahandle = pVgroup;
...@@ -559,7 +559,7 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { ...@@ -559,7 +559,7 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
} }
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
mTrace("vgroup:%d, send create msg, ahandle:%p", pVgroup->vgId, ahandle); mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle);
SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup); SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = ahandle, .handle = ahandle,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册