diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 812cc8c870b091517659eed4d5ef42cad4a578a2..d1e27a3ef17b7c57dd922e50773f9cee902dc371 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -22,7 +22,7 @@ #include "dnode.h" #include "dnodeMClient.h" #include "dnodeModule.h" -#include "dnodeMClient.h" +#include "dnodeMgmt.h" static bool dnodeReadMnodeIpList(); static void dnodeSaveMnodeIpList(); @@ -35,7 +35,7 @@ static SRpcIpSet tsDnodeMnodeIpList = {0}; int32_t dnodeInitMClient() { if (!dnodeReadMnodeIpList()) { dTrace("failed to read mnode iplist, set it from cfg file"); - memset(&tsDnodeMnodeIpList, sizeof(SRpcIpSet), 0); + memset(&tsDnodeMnodeIpList, 0, sizeof(SRpcIpSet)); tsDnodeMnodeIpList.port = tsMnodeDnodePort; tsDnodeMnodeIpList.numOfIps = 1; tsDnodeMnodeIpList.ip[0] = inet_addr(tsMasterIp); @@ -106,13 +106,13 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { pStatusRsp->ipList.ip[i] = htonl(pStatusRsp->ipList.ip[i]); } - dTrace("status msg is received, result:%d", tstrerror(pMsg->code)); + //dTrace("status msg is received, result:%s", tstrerror(pMsg->code)); if (memcmp(&(pStatusRsp->ipList), &tsDnodeMnodeIpList, sizeof(SRpcIpSet)) != 0) { dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", pStatusRsp->ipList.numOfIps, pStatusRsp->ipList.inUse); memcpy(&tsDnodeMnodeIpList, &pStatusRsp->ipList, sizeof(SRpcIpSet)); for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; ++i) { - dPrint("mnode IP index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); + dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); } dnodeSaveMnodeIpList(); } @@ -142,7 +142,7 @@ static bool dnodeReadMnodeIpList() { int32_t value = 0; int32_t num = 0; - fscanf(fp, "%s %d", option, &value); + num = fscanf(fp, "%s %d", option, &value); if (num != 2) return false; if (strcmp(option, "inUse") != 0) return false; tsDnodeMnodeIpList.inUse = (int8_t)value;; @@ -167,7 +167,7 @@ static bool dnodeReadMnodeIpList() { fclose(fp); dPrint("read mnode iplist successed"); for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { - dPrint("mnode IP index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); + dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); } return true; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index fa915187ec9dc2f5ea49343a8316dfc13a62dbaf..cf1de5eab9574cfbebb5810349c479afe697c079 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -218,10 +218,10 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { return terrno; } - STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb); + //STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb); SVnodeObj vnodeObj = {0}; - vnodeObj.vgId = tsdbInfo->tsdbCfg.tsdbId; + vnodeObj.vgId = vnode;//tsdbInfo->tsdbCfg.tsdbId; vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; @@ -324,7 +324,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); - dPrint("vgroup:%d, vnode is created", vnodeObj.vgId); + dPrint("vgroup:%d, vnode:%d is created", vnodeObj.vgId, vnodeObj.vgId); return TSDB_CODE_SUCCESS; } @@ -359,7 +359,7 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - dTrace("vgroup:%d, start to create vnode in dnode", pCreate->cfg.vgId); + dTrace("vgroup:%d, start to create vnode:%d in dnode", pCreate->cfg.vgId, pCreate->cfg.vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); if (pVnodeObj != NULL) { @@ -439,6 +439,7 @@ static void dnodeBuildVloadMsg(char *pNode, void * param) { SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; pLoad->vgId = htonl(pVnode->vgId); + pLoad->vnode = htonl(pVnode->vgId); pLoad->status = pVnode->status; } @@ -498,7 +499,7 @@ static void dnodeReadDnodeId() { int32_t value = 0; int32_t num = 0; - fscanf(fp, "%s %d", option, &value); + num = fscanf(fp, "%s %d", option, &value); if (num != 2) return; if (strcmp(option, "dnodeId") != 0) return; tsDnodeId = value;; diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 6835f6379305f74c57855edb17bc80714128bd2e..7a2facb255c784af0b0856c24abb856a2a33eed8 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -84,13 +84,13 @@ int32_t dnodeInitModules() { } void dnodeStartModules() { - for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) { - if (tsModule[mod].num != 0 && tsModule[mod].startFp) { - if ((*tsModule[mod].startFp)() != 0) { - dError("failed to start module:%d", mod); - } - } - } + // for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) { + // if (tsModule[mod].num != 0 && tsModule[mod].startFp) { + // if ((*tsModule[mod].startFp)() != 0) { + // dError("failed to start module:%d", mod); + // } + // } + // } } void dnodeProcessModuleStatus(uint32_t moduleStatus) { diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 4cae1d5c678100e5341d50318b05899e448e64d6..35902df61723c6ae2255e4403df5744255999121 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -39,6 +39,28 @@ extern "C" { #include "ttimer.h" #include "tutil.h" +typedef struct { + int32_t mnodeId; + uint32_t privateIp; + uint32_t publicIp; + int64_t createdTime; + int64_t lostTime; + uint64_t dbVersion; + uint32_t rack; + uint16_t idc; + uint16_t slot; + int8_t role; + int8_t status; + int8_t numOfMnodes; + int32_t numOfDnodes; + char mnodeName[TSDB_DNODE_NAME_LEN + 1]; + char reserved[7]; + char updateEnd[1]; + int syncFd; + void *hbTimer; + void *pSync; +} SMnodeObj; + typedef struct { int32_t dnodeId; uint32_t privateIp; diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index 0f08664b6a7b91440dbbd6d6bb76140bff24fd74..d768d2dd7cd7ca517a02d5dcce88545b178e2851 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -22,6 +22,8 @@ extern "C" { bool mgmtCheckRedirect(void *handle); +void mgmtGetMnodeIpList(SRpcIpSet *ipSet); + int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp); int32_t mgmtRemoveMnode(uint32_t privateIp); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 8f5c7316f1f13b2ff92ea12efdada3b21f437973..c7dcefa5f4805d3535e849b0bdedf7e7d4670ed1 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -528,7 +528,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pStatus->dnodeId == 0) { pDnode = mgmtGetDnodeByIp(htonl(pStatus->privateIp)); if (pDnode == NULL) { - mTrace("dnode not created, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName); + mTrace("dnode not created, privateIp:%s", taosIpStr(htonl(pStatus->privateIp))); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); + return; + } + } else { + pDnode = mgmtGetDnode(pStatus->dnodeId); + if (pDnode == NULL) { + mError("dnode:%d, not exist, privateIp:%s", taosIpStr(pStatus->dnodeId), pStatus->dnodeName); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); return; } @@ -569,11 +576,11 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage); pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten); - SVgObj *pVgroup = mgmtGetVgroup(pStatus->load[j].vgId); + SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId); if (pVgroup == NULL) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); - mPrint("dnode:%d, vnode:%d not exist in mnode, drop it", pDnode->dnodeId, pStatus->load[j].vgId); - mgmtSendDropVnodeMsg(pStatus->load[j].vgId, &ipSet, NULL); + mPrint("dnode:%d, vnode:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); + mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); } } @@ -590,11 +597,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { return; } - pRsp->ipList = *pSdbIpList; - pRsp->ipList.port = htons(pRsp->ipList.port); - for (int i = 0; i < pRsp->ipList.numOfIps; ++i) { - pRsp->ipList.ip[i] = htonl(pRsp->ipList.ip[i]); - } + mgmtGetMnodeIpList(&pRsp->ipList); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); @@ -606,8 +609,9 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { //TODO: set vnode access SRpcMsg rpcRsp = { + .handle = rpcMsg->handle, .code = TSDB_CODE_SUCCESS, - .pCont = pStatus, + .pCont = pRsp, .contLen = contLen }; diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 00785dd0f62bfeaae2bba3e593e4e4d72729de03..dc322e742f7cc5d78ac4adf5a61c98cc8ccf181d 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -23,9 +23,9 @@ int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL; int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL; int32_t (*mgmtGetMnodesNumFp)() = NULL; -void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; +void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SMnodeObj **pMnode) = NULL; -static SSdbPeer tsMnodeObj = {0}; +static SMnodeObj tsMnodeObj = {0}; static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -57,12 +57,12 @@ static int32_t mgmtGetMnodesNum() { } } -static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { +static void *mgmtGetNextMnode(SShowObj *pShow, SMnodeObj **pMnode) { if (mgmtGetNextMnodeFp) { return (*mgmtGetNextMnodeFp)(pShow, pMnode); } else { if (*pMnode == NULL) { - *pMnode = NULL; + *pMnode = &tsMnodeObj; } else { *pMnode = NULL; } @@ -129,18 +129,19 @@ static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; int32_t cols = 0; - SSdbPeer *pMnode = NULL; + SMnodeObj *pMnode = NULL; char *pWrite; char ipstr[32]; while (numOfRows < rows) { - pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode); + pShow->pNode = mgmtGetNextMnode(pShow, (SMnodeObj **)&pMnode); if (pMnode == NULL) break; cols = 0; + tinet_ntoa(ipstr, pMnode->privateIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pMnode->ipstr); + strcpy(pWrite, ipstr); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -167,4 +168,9 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi return numOfRows; } - +void mgmtGetMnodeIpList(SRpcIpSet *ipSet) { + ipSet->inUse = 0; + ipSet->port = htons(tsMnodeDnodePort); + ipSet->numOfIps = 1; + ipSet->ip[0] = htonl(inet_addr(tsMasterIp)); +} \ No newline at end of file diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 96854ab9c047f91932c1a2d7c6bba61593c14454..be99148deb9d68596cf68cb439424ca9c2d9003e 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -165,7 +165,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mPrint("vgroup:%d, dnode:%d vnode:%d", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].dnodeId), pVgroup->vnodeGid[i].vnode); + mPrint("vgroup:%d, dnode:%d vnode:%d", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode); } pMsg->ahandle = pVgroup; @@ -559,7 +559,7 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { } void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { - mTrace("vgroup:%d, send create msg, ahandle:%p", pVgroup->vgId, ahandle); + mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle); SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .handle = ahandle,