diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeMClient.h index 391e8da2c10382491bd089f98481977229b5b528..cab9ea9be4de577561b580e0376efcd721003370 100644 --- a/src/dnode/inc/dnodeMClient.h +++ b/src/dnode/inc/dnodeMClient.h @@ -20,8 +20,10 @@ extern "C" { #endif -int32_t dnodeInitMClient(); -void dnodeCleanupMClient(); +int32_t dnodeInitMClient(); +void dnodeCleanupMClient(); +void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); +uint32_t dnodeGetMnodeMasteIp(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index b8d01916fe0938676a09656813a91765e4b9507d..f944bd5adda6a2e9532036b1e4ad6788ec792b1d 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -23,6 +23,7 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeCleanupMgmt(); void dnodeMgmt(SRpcMsg *rpcMsg); +void dnodeUpdateDnodeId(int32_t dnodeId); void* dnodeGetVnode(int32_t vgId); int32_t dnodeGetVnodeStatus(void *pVnode); diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index 3e8e7d28f5ecc1bd8000b8f3c8dec74012108866..728630748f8b195721cd554cf590af900c49a1e8 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -23,6 +23,7 @@ extern "C" { int32_t dnodeInitModules(); void dnodeCleanUpModules(); void dnodeStartModules(); +void dnodeProcessModuleStatus(uint32_t moduleStatus); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 226a47f9752ab3270ba44f33df04b898625288d8..e3a7f99a5fbf48257d6605b903f0e24f17c67f4c 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -18,16 +18,22 @@ #include "taosmsg.h" #include "tlog.h" #include "trpc.h" +#include "tutil.h" #include "dnode.h" +#include "dnodeMClient.h" -static void (*dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static bool dnodeReadMnodeIpList(); +static void dnodeSaveMnodeIpList(); static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg); -static void *tsDnodeMClientRpc; +static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static void *tsDnodeMClientRpc = NULL; +static SRpcIpSet tsDnodeMnodeIpList = {0}; int32_t dnodeInitMClient() { - dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; - + dnodeReadMnodeIpList(); + tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; + SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; @@ -35,9 +41,12 @@ int32_t dnodeInitMClient() { rpcInit.label = "DND-MC"; rpcInit.numOfThreads = 1; rpcInit.cfp = dnodeProcessRspFromMnode; - rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; + rpcInit.sessions = 100; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.idleTime = tsShellActivityTimer * 2000; + rpcInit.user = "t"; + rpcInit.ckey = "key"; + rpcInit.secret = "secret"; tsDnodeMClientRpc = rpcOpen(&rpcInit); if (tsDnodeMClientRpc == NULL) { @@ -53,18 +62,122 @@ void dnodeCleanupMClient() { if (tsDnodeMClientRpc) { rpcClose(tsDnodeMClientRpc); tsDnodeMClientRpc = NULL; + dPrint("mnode rpc client is closed"); } } static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { - if (dnodeProcessMgmtRspFp[pMsg->msgType]) { - (*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); + if (tsDnodeProcessMgmtRspFp[pMsg->msgType]) { + (*tsDnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); } else { - dError("%s is not processed in mclient", taosMsg[pMsg->msgType]); + dError("%s is not processed in mnode rpc client", taosMsg[pMsg->msgType]); } rpcFreeCont(pMsg->pCont); } static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { + if (pMsg->code != TSDB_CODE_SUCCESS) { + dError("status rsp is received, reason:%s", tstrerror(pMsg->code)); + return; + } + + SDMStatusRsp *pStatusRsp = pMsg->pCont; + if (pStatusRsp->ipList.numOfIps <= 0) { + dError("status msg is invalid, num of ips is %d", pStatusRsp->ipList.numOfIps); + return; + } + + pStatusRsp->ipList.port = htons(pStatusRsp->ipList.port); + for (int32_t i = 0; i < pStatusRsp->ipList.numOfIps; ++i) { + pStatusRsp->ipList.ip[i] = htonl(pStatusRsp->ipList.ip[i]); + } + + dTrace("status msg is received, result:%d", tstrerror(pMsg->code)); + + if (memcmp(&(pStatusRsp->ipList), &tsDnodeMnodeIpList, sizeof(SRpcIpSet)) != 0) { + dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", pStatusRsp->ipList.numOfIps, pStatusRsp->ipList.inUse); + memcpy(&tsDnodeMnodeIpList, &pStatusRsp->ipList, sizeof(SRpcIpSet)); + for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; ++i) { + dPrint("mnode IP index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); + } + dnodeSaveMnodeIpList(); + } + + SDnodeState *pState = &pStatusRsp->dnodeState; + pState->numOfVnodes = htonl(pState->numOfVnodes); + pState->moduleStatus = htonl(pState->moduleStatus); + pState->createdTime = htonl(pState->createdTime); + pState->dnodeId = htonl(pState->dnodeId); + + dnodeProcessModuleStatus(pState->moduleStatus); + dnodeUpdateDnodeId(pState->dnodeId); } + +void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { + rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg); +} + +static bool dnodeReadMnodeIpList() { + char ipFile[TSDB_FILENAME_LEN] = {0}; + sprintf(ipFile, "%s/iplist", tsDnodeDir); + + FILE *fp = fopen(ipFile, "r"); + if (!fp) return false; + + char option[32] = {0}; + int32_t value = 0; + int32_t num = 0; + + fscanf(fp, "%s %d", option, &value); + if (num != 2) return false; + if (strcmp(option, "inUse") != 0) return false; + tsDnodeMnodeIpList.inUse = (int8_t)value;; + + num = fscanf(fp, "%s %d", option, &value); + if (num != 2) return false; + if (strcmp(option, "numOfIps") != 0) return false; + tsDnodeMnodeIpList.numOfIps = (int8_t)value; + + num = fscanf(fp, "%s %d", option, &value); + if (num != 2) return false; + if (strcmp(option, "port") != 0) return false; + tsDnodeMnodeIpList.port = (uint16_t)value; + + for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { + num = fscanf(fp, "%s %d", option, &value); + if (num != 2) return false; + if (strncmp(option, "ip", 2) != 0) return false; + tsDnodeMnodeIpList.ip[i] = (uint32_t)value; + } + + fclose(fp); + dPrint("read mnode iplist successed"); + for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { + dPrint("mnode IP index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); + } + + return true; +} + +static void dnodeSaveMnodeIpList() { + char ipFile[TSDB_FILENAME_LEN] = {0}; + sprintf(ipFile, "%s/iplist", tsDnodeDir); + + FILE *fp = fopen(ipFile, "w"); + if (!fp) return; + + fprintf(fp, "inUse %d\n", tsDnodeMnodeIpList.inUse); + fprintf(fp, "numOfIps %d\n", tsDnodeMnodeIpList.numOfIps); + fprintf(fp, "port %u\n", tsDnodeMnodeIpList.port); + for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { + fprintf(fp, "ip%d %u\n", i, tsDnodeMnodeIpList.ip[i]); + } + + fclose(fp); + dPrint("save mnode iplist successed"); +} + +uint32_t dnodeGetMnodeMasteIp() { + return tsDnodeMnodeIpList.ip[0]; +} \ No newline at end of file diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index e547faa7a042af8908d2696a4bf505d16d178f9a..caff618048b04a070874584a1dd27a7c7df868a2 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -22,16 +22,18 @@ #include "trpc.h" #include "tstatus.h" #include "tsdb.h" +#include "ttime.h" #include "ttimer.h" +#include "dnodeMClient.h" #include "dnodeMgmt.h" #include "dnodeRead.h" #include "dnodeWrite.h" typedef struct { int32_t vgId; // global vnode group ID - int32_t vnode; - int32_t status; // status: master, slave, notready, deleting int32_t refCount; // reference count + int8_t dirty; + int8_t status; // status: master, slave, notready, deleting int64_t version; void *wworker; void *rworker; @@ -48,6 +50,7 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir); static void dnodeCleanupVnode(SVnodeObj *pVnode); static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg); static void dnodeDropVnode(SVnodeObj *pVnode); +static void dnodeDoDropVnode(SVnodeObj *pVnode); static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); @@ -55,17 +58,23 @@ static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg); static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg); static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); static void dnodeSendStatusMsg(void *handle, void *tmrId); +static void dnodeReadDnodeId(); -static void * tsDnodeVnodesHash = NULL; -static void *tsDnodeTmr = NULL; -static void *tsStatusTimer = NULL; +static void *tsDnodeVnodesHash = NULL; +static void *tsDnodeTmr = NULL; +static void *tsStatusTimer = NULL; +static uint32_t tsRebootTime; +static int32_t tsDnodeId = 0; +static char tsDnodeName[TSDB_DNODE_NAME_LEN]; int32_t dnodeInitMgmt() { + dnodeReadDnodeId(); + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { @@ -73,6 +82,8 @@ int32_t dnodeInitMgmt() { return -1; } + tsRebootTime = taosGetTimestampSec(); + tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); if (tsDnodeTmr == NULL) { dError("failed to init dnode timer"); @@ -89,8 +100,16 @@ void dnodeCleanupMgmt() { tsStatusTimer = NULL; } + if (tsDnodeTmr != NULL) { + taosTmrCleanUp(tsDnodeTmr); + tsDnodeTmr = NULL; + } + dnodeCleanupVnodes(); - taosCleanUpIntHash(tsDnodeVnodesHash); + if (tsDnodeVnodesHash == NULL) { + taosCleanUpIntHash(tsDnodeVnodesHash); + tsDnodeVnodesHash = NULL; + } } void dnodeMgmt(SRpcMsg *pMsg) { @@ -106,7 +125,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcSendResponse(&rsp); } - rpcFreeCont(pMsg->pCont); // free the received message + rpcFreeCont(pMsg->pCont); } void *dnodeGetVnode(int32_t vgId) { @@ -145,8 +164,13 @@ void *dnodeGetVnodeTsdb(void *pVnode) { return ((SVnodeObj *)pVnode)->tsdb; } -void dnodeReleaseVnode(void *pVnode) { - atomic_sub_fetch_32(&((SVnodeObj *) pVnode)->refCount, 1); +void dnodeReleaseVnode(void *pVnodeRaw) { + SVnodeObj *pVnode = pVnodeRaw; + int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); + + if (count == 0 && pVnode->dirty) { + dnodeDoDropVnode(pVnode); + } } static int32_t dnodeOpenVnodes() { @@ -194,11 +218,10 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { return terrno; } - //STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb); + STsdbRepoInfo *tsdbInfo = tsdbGetStatus(pTsdb); - SVnodeObj vnodeObj; - vnodeObj.vgId = vnode; //tsdbInfo->tsdbCfg.vgId; - vnodeObj.vnode = vnode; //tsdbInfo->tsdbCfg.tsdbId; + SVnodeObj vnodeObj = {0}; + vnodeObj.vgId = tsdbInfo->tsdbCfg.tsdbId; vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; @@ -212,7 +235,7 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); - dTrace("open vnode:%d in %s", vnodeObj.vnode, rootDir); + dTrace("open vnode:%d in %s", vnodeObj.vgId, rootDir); return TSDB_CODE_SUCCESS; } @@ -241,14 +264,13 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) { pVnode->tsdb = NULL; } - dTrace("cleanup vnode:%d", pVnode->vnode); + dTrace("cleanup vnode:%d", pVnode->vgId); } static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { STsdbCfg tsdbCfg = {0}; - tsdbCfg.vgId = pVnodeCfg->cfg.vgId; tsdbCfg.precision = pVnodeCfg->cfg.precision; - tsdbCfg.tsdbId = pVnodeCfg->vnode; + tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; tsdbCfg.minRowsPerFileBlock = -1; @@ -283,13 +305,12 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); if (pTsdb == NULL) { - dError("vgroup:%d, failed to create tsdb in vnode:%d, reason:%s", pVnodeCfg->cfg.vgId, pVnodeCfg->vnode, tstrerror(terrno)); + dError("vgroup:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); return terrno; } - SVnodeObj vnodeObj; + SVnodeObj vnodeObj = {0}; vnodeObj.vgId = pVnodeCfg->cfg.vgId; - vnodeObj.vnode = pVnodeCfg->vnode; vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; @@ -303,18 +324,11 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); - dPrint("vgroup:%d, vnode:%d is created", pVnodeCfg->cfg.vgId, pVnodeCfg->vnode); + dPrint("vgroup:%d, vnode is created", vnodeObj.vgId); return TSDB_CODE_SUCCESS; } -static void dnodeDropVnode(SVnodeObj *pVnode) { - pVnode->status = TSDB_VN_STATUS_NOT_READY; - - int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); - if (count > 0) { - // wait refcount - } - +static void dnodeDoDropVnode(SVnodeObj *pVnode) { if (pVnode->tsdb) { tsdbDropRepo(pVnode->tsdb); pVnode->tsdb = NULL; @@ -324,21 +338,33 @@ static void dnodeDropVnode(SVnodeObj *pVnode) { taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); } +static void dnodeDropVnode(SVnodeObj *pVnode) { + pVnode->status = TSDB_VN_STATUS_NOT_READY; + pVnode->dirty = true; + + int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); + if (count > 0) { + dTrace("vgroup:%d, vnode will be dropped until refcount:%d is 0", pVnode->vgId, count); + return; + } + + dnodeDoDropVnode(pVnode); +} + static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; - pCreate->vnode = htonl(pCreate->vnode); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - dTrace("vgroup:%d, start to create vnode:%d", pCreate->cfg.vgId, pCreate->vnode); + dTrace("vgroup:%d, start to create vnode in dnode", pCreate->cfg.vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); if (pVnodeObj != NULL) { rpcRsp.code = TSDB_CODE_SUCCESS; - dPrint("vgroup:%d, vnode:%d is already exist", pCreate->cfg.vgId, pCreate->vnode); + dPrint("vgroup:%d, vnode is already exist", pCreate->cfg.vgId); } else { rpcRsp.code = dnodeCreateVnode(pCreate); } @@ -352,6 +378,8 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { SMDDropVnodeMsg *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); + dTrace("vgroup:%d, start to drop vnode in dnode", pDrop->vgId); + SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId); if (pVnodeObj != NULL) { dnodeDropVnode(pVnodeObj); @@ -367,11 +395,12 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; - pCreate->vnode = htonl(pCreate->vnode); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + dTrace("vgroup:%d, start to alter vnode in dnode", pCreate->cfg.vgId); + SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); if (pVnodeObj != NULL) { rpcRsp.code = TSDB_CODE_SUCCESS; @@ -401,60 +430,103 @@ static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { rpcSendResponse(&rpcRsp); } +static void dnodeBuildVloadMsg(char *pNode, void * param) { + SVnodeObj *pVnode = (SVnodeObj *) pNode; + if (pVnode->dirty) return; + + SDMStatusMsg *pStatus = param; + if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; + + SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; + pLoad->vgId = htonl(pVnode->vgId); + pLoad->status = pVnode->status; +} + static void dnodeSendStatusMsg(void *handle, void *tmrId) { + if (tsDnodeTmr == NULL) { + dError("dnode timer is already released"); + return; + } + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); if (tsStatusTimer == NULL) { dError("failed to start status timer"); return; } -// int32_t contLen = sizeof(SDMStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad); -// SDMStatusMsg *pStatus = rpcMallocCont(contLen); -// if (pStatus == NULL) { -// dError("Failed to malloc status message"); -// return; -// } -// -// int32_t totalVnodes = dnodeGetVnodesNum(); -// -// pStatus->version = htonl(tsVersion); -// pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); -// pStatus->publicIp = htonl(inet_addr(tsPublicIp)); -// pStatus->lastReboot = htonl(tsRebootTime); -// pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); -// pStatus->openVnodes = htons((uint16_t) totalVnodes); -// pStatus->numOfCores = htons((uint16_t) tsNumOfCores); -// pStatus->diskAvailable = tsAvailDataDirGB; -// pStatus->alternativeRole = (uint8_t) tsAlternativeRole; -// -// SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load; - - //TODO loop all vnodes - // for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) { - // if (vnodeList[vnode].cfg.maxSessions <= 0) continue; - // - // SVnodeObj *pVnode = vnodeList + vnode; - // pLoad->vnode = htonl(vnode); - // pLoad->vgId = htonl(pVnode->cfg.vgId); - // pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus; - // pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus; - // pLoad->accessState = (uint8_t)(pVnode->accessState); - // pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage); - // pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage); - // if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) { - // pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten); - // } else { - // pLoad->pointsWritten = htobe64(0); - // } - // pLoad++; - // - // if (++count >= tsOpenVnodes) { - // break; - // } - // } - -// dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen); + int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SDMStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + dError("failed to malloc status message"); + return; + } + + strcpy(pStatus->dnodeName, tsDnodeName); + pStatus->version = htonl(tsVersion); + pStatus->dnodeId = htonl(tsDnodeId); + pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); + pStatus->publicIp = htonl(inet_addr(tsPublicIp)); + pStatus->lastReboot = htonl(tsRebootTime); + pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); + pStatus->numOfCores = htons((uint16_t) tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + pStatus->alternativeRole = (uint8_t) tsAlternativeRole; + + taosVisitIntHashWithFp(tsDnodeVnodesHash, dnodeBuildVloadMsg, pStatus); + contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); + pStatus->openVnodes = htons(pStatus->openVnodes); + + SRpcMsg rpcMsg = { + .pCont = pStatus, + .contLen = contLen, + .msgType = TSDB_MSG_TYPE_DM_STATUS + }; + + dnodeSendMsgToMnode(&rpcMsg); +} + +static void dnodeReadDnodeId() { + char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); + + FILE *fp = fopen(dnodeIdFile, "r"); + if (!fp) return; + + char option[32] = {0}; + int32_t value = 0; + int32_t num = 0; + + fscanf(fp, "%s %d", option, &value); + if (num != 2) return false; + if (strcmp(option, "dnodeId") != 0) return false; + tsDnodeId = value;; + + fclose(fp); + dPrint("read dnodeId:%d successed", tsDnodeId); } +static void dnodeSaveDnodeId() { + char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); + + FILE *fp = fopen(dnodeIdFile, "w"); + if (!fp) { + return false; + } + + fprintf(fp, "dnodeId %d\n", tsDnodeId); + + fclose(fp); + dPrint("save dnodeId successed"); + + return true; +} +void dnodeUpdateDnodeId(int32_t dnodeId) { + if (tsDnodeId == 0) { + dPrint("dnodeId is set to %d", dnodeId); + tsDnodeId = dnodeId; + dnodeSaveDnodeId(); + } +} diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeMnode.c index da47e4e49cede34dac50f421e419b504b4b48181..9d1be0148ebf3e011ec0e649e38d2cf402b42d2e 100644 --- a/src/dnode/src/dnodeMnode.c +++ b/src/dnode/src/dnodeMnode.c @@ -40,12 +40,12 @@ int32_t dnodeInitMnode() { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; rpcInit.localPort = tsDnodeMnodePort; - rpcInit.label = "DND-mnode"; + rpcInit.label = "DND-MS"; rpcInit.numOfThreads = 1; rpcInit.cfp = dnodeProcessMsgFromMnode; - rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; + rpcInit.sessions = 100; rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.idleTime = tsShellActivityTimer * 2000; tsDnodeMnodeRpc = rpcOpen(&rpcInit); if (tsDnodeMnodeRpc == NULL) { @@ -61,6 +61,7 @@ void dnodeCleanupMnode() { if (tsDnodeMnodeRpc) { rpcClose(tsDnodeMnodeRpc); tsDnodeMnodeRpc = NULL; + dPrint("mnode rpc server is closed"); } } diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 9f5164dd5bf67c4278cc89719161d72441c022a9..6835f6379305f74c57855edb17bc80714128bd2e 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -74,7 +74,7 @@ int32_t dnodeInitModules() { for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) { if (tsModule[mod].num != 0 && tsModule[mod].initFp) { if ((*tsModule[mod].initFp)() != 0) { - dError("TDengine initialization failed"); + dError("failed to init modules"); return -1; } } @@ -92,3 +92,36 @@ void dnodeStartModules() { } } } + +void dnodeProcessModuleStatus(uint32_t moduleStatus) { + if (moduleStatus == tsModuleStatus) return; + + dPrint("module status is received, old:%d, new:%d", tsModuleStatus, moduleStatus); + + int news = moduleStatus; + int olds = tsModuleStatus; + + for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { + int newStatus = news & (1 << moduleType); + int oldStatus = olds & (1 << moduleType); + + if (oldStatus > 0) { + if (newStatus == 0) { + if (tsModule[moduleType].stopFp) { + dPrint("module:%s is stopped on this node", tsModule[moduleType].name); + (*tsModule[moduleType].stopFp)(); + } + } + } else if (oldStatus == 0) { + if (newStatus > 0) { + if (tsModule[moduleType].startFp) { + dPrint("module:%s is started on this node", tsModule[moduleType].name); + (*tsModule[moduleType].startFp)(); + } + } + } else { + } + } + + tsModuleStatus = moduleStatus; +} diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index b10ca16467073d7f5da7751f914717b14e70178e..1deefd4f532df6eb6a4f14c5736b41f296c9e967 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -94,7 +94,7 @@ void dnodeWrite(SRpcMsg *pMsg) { SRpcContext *pRpcContext = NULL; if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { - SMsgDesc *pDesc = pCont; + SMsgDesc *pDesc = (SMsgDesc *)pCont; pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); pCont += sizeof(SMsgDesc); if (pDesc->numOfVnodes > 1) { diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 48aeb2dfe6b2736f93437065b87f4c21f550e8a0..d42d2f45f0929d67686487b45d7964b099e9f096 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -40,6 +40,7 @@ extern "C" { #include "tutil.h" typedef struct { + int32_t dnodeId; uint32_t privateIp; int32_t sid; uint32_t moduleStatus; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 712c99adc2261e8e62d1a6a8904f17df95d3dc46..7aac36786a32288a9d2947f010ad4a50f0495ace 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -165,6 +165,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_MAX_COLUMNS 256 #define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns +#define TSDB_DNODE_NAME_LEN 63 #define TSDB_TABLE_NAME_LEN 64 #define TSDB_DB_NAME_LEN 32 #define TSDB_COL_NAME_LEN 64 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e4b083bb9d7d73162dac378e9b818c668df8c91b..38fcb3fd42a58e3d7bbfa6a470053031652b98d3 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -25,7 +25,6 @@ extern "C" { #include "taosdef.h" #include "taoserror.h" -#include "taosdef.h" #include "trpc.h" // message type @@ -520,16 +519,14 @@ typedef struct { } SRetrieveTableRsp; typedef struct { - uint32_t vnode; - uint32_t vgId; - uint8_t status; - uint8_t dropStatus; - uint8_t accessState; - int64_t totalStorage; - int64_t compStorage; - int64_t pointsWritten; - uint8_t syncStatus; - uint8_t reserved[15]; + int32_t vgId; + int64_t totalStorage; + int64_t compStorage; + int64_t pointsWritten; + uint8_t status; + uint8_t syncStatus; + uint8_t accessState; + uint8_t reserved[6]; } SVnodeLoad; typedef struct { @@ -582,14 +579,16 @@ typedef struct { } SVnodeStatisticInfo; typedef struct { + int32_t dnodeId; uint32_t moduleStatus; uint32_t createdTime; uint32_t numOfVnodes; - uint32_t reserved; } SDnodeState; typedef struct { uint32_t version; + int32_t dnodeId; + char dnodeName[TSDB_DNODE_NAME_LEN]; uint32_t privateIp; uint32_t publicIp; uint32_t lastReboot; // time stamp for last reboot @@ -603,14 +602,12 @@ typedef struct { } SDMStatusMsg; typedef struct { - int32_t code; - SDnodeState dnodeState; SRpcIpSet ipList; + SDnodeState dnodeState; SVnodeAccess vnodeAccess[]; } SDMStatusRsp; typedef struct { - int32_t vnode; SVnodeCfg cfg; SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; } SMDCreateVnodeMsg; diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 56a8fd054f2ce2bd35c6ed2d311f237c3ca85dee..b7c68b5f8062240fd1f7c71f1e789d3a31f08630 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -32,15 +32,15 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); void mgmtCreateVgroup(SQueuedMsg *pMsg); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtUpdateVgroup(SVgObj *pVgroup); +void mgmtUpdateVgroupIp(SDnodeObj *pDnode); void mgmtSetVgroupIdPool(); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); - -SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode); -void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index 98a2e387285889bbcf5fdcf94e1e700c7fa3a6c7..ce7ac8cb39dc5a6ac41fd7174dd5c1b9ec3cf4c1 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -299,62 +299,4 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s // mgmtCleanUpDnodeIntFp(); // } //} -// -//void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { -// SDMStatusMsg *pStatus = (SDMStatusMsg *)pCont; -// -// SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp)); -// if (pObj == NULL) { -// mError("dnode:%s not exist", taosIpStr(pObj->privateIp)); -// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_DNODE_NOT_EXIST, NULL, 0); -// return; -// } -// -// pObj->lastReboot = htonl(pStatus->lastReboot); -// pObj->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); -// pObj->openVnodes = htons(pStatus->openVnodes); -// pObj->numOfCores = htons(pStatus->numOfCores); -// pObj->diskAvailable = pStatus->diskAvailable; -// pObj->alternativeRole = pStatus->alternativeRole; -//// -//// if (mgmtProcessDnodeStatusFp) { -//// mgmtProcessDnodeStatusFp(pStatus, pObj, pConn); -//// return; -//// } -// -// pObj->status = TSDB_DN_STATUS_READY; -// -//// // wait vnode dropped -//// for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) { -//// SVnodeLoad *pVload = &(pObj->vload[vnode]); -//// if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { -//// bool existInDnode = false; -//// for (int32_t j = 0; j < pObj->openVnodes; ++j) { -//// if (htonl(pStatus->load[j].vnode) == vnode) { -//// existInDnode = true; -//// break; -//// } -//// } -//// -//// if (!existInDnode) { -//// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; -//// pVload->status = TSDB_VN_STATUS_OFFLINE; -//// mgmtUpdateDnode(pObj); -//// mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode); -//// taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr); -//// } -//// } else if (pVload->vgId == 0) { -//// /* -//// * In some cases, vnode information may be reported abnormally, recover it -//// */ -//// if (pVload->dropStatus != TSDB_VN_DROP_STATUS_READY || pVload->status != TSDB_VN_STATUS_OFFLINE) { -//// mPrint("dnode:%s, vid:%d, vgroup:%d status:%s dropStatus:%s, set it to avail status", -//// taosIpStr(pObj->privateIp), vnode, pVload->vgId, taosGetVnodeStatusStr(pVload->status), -//// taosGetVnodeDropStatusStr(pVload->dropStatus)); -//// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY; -//// pVload->status = TSDB_VN_STATUS_OFFLINE; -//// mgmtUpdateDnode(pObj); -//// } -//// } -//// } -//} +// \ No newline at end of file diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index fcd9e5823120fadc0701380204a32c16baf01c82..19ee53af0c5bd128dc49259408a661c863859a91 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -23,12 +23,14 @@ #include "mgmtDClient.h" #include "mgmtMnode.h" #include "mgmtShell.h" +#include "mgmtDServer.h" #include "mgmtUser.h" #include "mgmtVgroup.h" int32_t (*mgmtInitDnodesFp)() = NULL; void (*mgmtCleanUpDnodesFp)() = NULL; SDnodeObj *(*mgmtGetDnodeFp)(uint32_t ip) = NULL; +SDnodeObj *(*mgmtGetDnodeByIpFp)(int32_t dnodeId) = NULL; int32_t (*mgmtGetDnodesNumFp)() = NULL; int32_t (*mgmtUpdateDnodeFp)(SDnodeObj *pDnode) = NULL; void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL; @@ -45,6 +47,7 @@ static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg); static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ; +static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; @@ -70,10 +73,9 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) { for (int32_t i = 0; i < pDnode->numOfVnodes; ++i) { SVnodeLoad *pVload = pDnode->vload + i; if (pVload->vgId != 0) { - mTrace("%d-dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s", - totalVnodes, taosIpStr(pDnode->privateIp), i, pVload->vgId, + mTrace("dnode:%d, calc free vnodes, vnode:%d, status:%d %s, syncstatus:%d %s", + pDnode->dnodeId, pVload->vgId, pVload->status, taosGetVnodeStatusStr(pVload->status), - pVload->dropStatus, taosGetVnodeDropStatusStr(pVload->dropStatus), pVload->syncStatus, taosGetVnodeSyncStatusStr(pVload->syncStatus)); totalVnodes++; } @@ -92,7 +94,7 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) { if (pDnode) { SVnodeLoad *pVload = pDnode->vload + vnodeGid[i].vnode; memset(pVload, 0, sizeof(SVnodeLoad)); - pVload->vnode = vnodeGid[i].vnode; + //pVload->vnode = vnodeGid[i].vnode; pVload->vgId = vgId; mTrace("dnode:%s, vnode:%d add to vgroup:%d", taosIpStr(pDnode->privateIp), vnodeGid[i].vnode, pVload->vgId); mgmtCalcNumOfFreeVnodes(pDnode); @@ -118,7 +120,6 @@ void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes) { } } - bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { uint32_t status = pDnode->moduleStatus & (1 << moduleType); return status > 0; @@ -319,12 +320,6 @@ static int32_t mgmtGetVnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "vgid"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - pShow->bytes[cols] = 12; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "status"); @@ -400,10 +395,6 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi cols = 0; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(uint32_t *)pWrite = pVnode->vnode; - cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(uint32_t *)pWrite = pVnode->vgId; cols++; @@ -437,11 +428,12 @@ int32_t mgmtInitDnodes() { mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); - + mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); if (mgmtInitDnodesFp) { return mgmtInitDnodesFp(); } else { + tsDnodeObj.dnodeId = 1; tsDnodeObj.privateIp = inet_addr(tsPrivateIp);; tsDnodeObj.createdTime = taosGetTimestampMs(); tsDnodeObj.lastReboot = taosGetTimestampSec(); @@ -478,6 +470,16 @@ SDnodeObj *mgmtGetDnode(uint32_t ip) { } } +SDnodeObj *mgmtGetDnodeByIp(int32_t dnodeId) { + if (mgmtGetDnodeByIpFp) { + return mgmtGetDnodeByIpFp(dnodeId); + } + if (dnodeId != 0) { + return &tsDnodeObj; + } + return NULL; +} + int32_t mgmtGetDnodesNum() { if (mgmtGetDnodesNumFp) { return mgmtGetDnodesNumFp(); @@ -569,4 +571,130 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { mTrace("cfg vnode rsp is received"); -} \ No newline at end of file +} + +void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { + if (mgmtCheckRedirect(rpcMsg->handle)) return; + + SDMStatusMsg *pStatus = rpcMsg->pCont; + pStatus->dnodeId = htonl(pStatus->dnodeId); + + SDnodeObj *pDnode = NULL; + if (pStatus->dnodeId == 0) { + pDnode = mgmtGetDnodeByIp(pStatus->privateIp); + if (pDnode == NULL) { + mTrace("dnode not created in cluster, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); + return; + } + } + + uint32_t version = htonl(pStatus->version); + if (version != tsVersion) { + mError("dnode:%d, status msg version:%d not equal with mnode:%d", pDnode->dnodeId, version, tsVersion); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_VERSION); + return ; + } + + uint32_t lastPrivateIp = htonl(pDnode->privateIp); + uint32_t lastPublicIp = htonl(pDnode->publicIp); + + pDnode->privateIp = htonl(pStatus->privateIp); + pDnode->publicIp = htonl(pStatus->publicIp); + pDnode->lastReboot = htonl(pStatus->lastReboot); + pDnode->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); + pDnode->openVnodes = htons(pStatus->openVnodes); + pDnode->numOfCores = htons(pStatus->numOfCores); + pDnode->diskAvailable = pStatus->diskAvailable; + pDnode->alternativeRole = pStatus->alternativeRole; + + if (pStatus->dnodeId == 0) { + mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pStatus->dnodeId), pStatus->dnodeName); + mgmtSetDnodeMaxVnodes(pDnode); + mgmtUpdateDnode(pDnode); + } + + if (lastPrivateIp != pDnode->privateIp || lastPublicIp != pDnode->publicIp) { + mgmtUpdateVgroupIp(pDnode); + //mgmtUpdateMnodeIp(); + } + + for (int32_t j = 0; j < pDnode->openVnodes; ++j) { + pStatus->load[j].vgId = htonl(pStatus->load[j].vgId); + pStatus->load[j].totalStorage = htobe64(pStatus->load[j].totalStorage); + pStatus->load[j].compStorage = htobe64(pStatus->load[j].compStorage); + pStatus->load[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten); + + bool existInMnode = false; + for (int32_t vnode = 0; vnode < pDnode->numOfVnodes; ++vnode) { + SVnodeLoad *pVload = &(pDnode->vload[vnode]); + if (pVload->vgId == pStatus->load[j].vgId) { + existInMnode = true; + } + } + + if (!existInMnode) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); + mPrint("dnode:%d, vnode:%d not exist in mnode, drop it", pDnode->dnodeId, pStatus->load[j].vgId); + mgmtSendDropVnodeMsg(pStatus->load[j].vgId, &ipSet, NULL); + } + } + + for (int32_t vnode = 0; vnode < pDnode->numOfVnodes; ++vnode) { + SVnodeLoad *pVload = &(pDnode->vload[vnode]); + + bool existInDnode = false; + for (int32_t j = 0; j < pDnode->openVnodes; ++j) { + if (htonl(pStatus->load[j].vgId) == pVload->vgId) { + existInDnode = true; + break; + } + } + + if (!existInDnode) { + mPrint("dnode:%d, vnode:%d not exist in dnode, create it", pDnode->dnodeId, pVload->vgId); + SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId); + if (pVgroup != NULL) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); + mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL); + } + } + } + + if (pDnode->status != TSDB_DN_STATUS_READY) { + mTrace("dnode:%d, from offline to online", pDnode->dnodeId); + pDnode->status = TSDB_DN_STATUS_READY; + //TODO: + //mgmtStartBalanceTimer(200); + } + + int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess); + SDMStatusRsp *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SERV_OUT_OF_MEMORY); + return; + } + + pRsp->ipList = *pSdbIpList; + pRsp->ipList.port = htons(pRsp->ipList.port); + for (int i = 0; i < pRsp->ipList.numOfIps; ++i) { + pRsp->ipList.ip[i] = htonl(pRsp->ipList.ip[i]); + } + + pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); + pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); + pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000); + pRsp->dnodeState.numOfVnodes = 0; + + contLen = sizeof(SDMStatusRsp); + + //TODO: set vnode access + + SRpcMsg rpcRsp = { + .code = TSDB_CODE_SUCCESS, + .pCont = pStatus, + .contLen = contLen + }; + + rpcSendResponse(&rpcRsp); +} diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 637d0a1107fa60d564d01f68d13323940360cf38..f53ca3f95fb62b1fd397a9ca02fb63bca3fb2383 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -19,4 +19,5 @@ bool mgmtCheckRedirect(void *handle) { return false; -} \ No newline at end of file +} + diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 223e67ebfb8e5f4e2d9b1e8ac1c271b1b75360a4..e87c964496fbcfe8d83c875f2f2adff933acefd9 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -392,7 +392,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { } if (pConnectMsg->db[0]) { - char dbName[TSDB_TABLE_ID_LEN] = {0}; + char dbName[TSDB_TABLE_ID_LEN * 3] = {0}; sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); SDbObj *pDb = mgmtGetDb(dbName); if (pDb == NULL) { diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index c2f3e199517d2e13b00d382087ec72969c3aa589..7faaed1d8097284f1176e0ac6813931c265b5e52 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -316,9 +316,9 @@ char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { } SVnodeLoad *vload = pDnode->vload + pVnode->vnode; - if (vload->vgId != pVgroup->vgId || vload->vnode != pVnode->vnode) { - mError("dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d", - taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode, vload->vgId, vload->vnode); + if (vload->vgId != pVgroup->vgId) { + mError("dnode:%s, vgroup:%d, not same with dnode vgroup:%d", + taosIpStr(pVnode->ip), pVgroup->vgId, vload->vgId); return "null"; } @@ -489,15 +489,14 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { taosFreeId(pVgroup->idPool, pTable->sid); } -SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { +SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) return NULL; SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg)); if (pVnode == NULL) return NULL; - pVnode->vnode = htonl(vnode); - pVnode->cfg = pDb->cfg; + pVnode->cfg = pDb->cfg; SVnodeCfg *pCfg = &pVnode->cfg; pCfg->vgId = htonl(pVgroup->vgId); @@ -517,7 +516,6 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { vpeerDesc[j].vgId = htonl(pVgroup->vgId); vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip); - vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); } return pVnode; @@ -559,9 +557,9 @@ SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) { return ipSet; } -void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { - mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); - SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode); +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { + mTrace("vgroup:%d, send create msg, ahandle:%p", pVgroup->vgId, ahandle); + SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .handle = ahandle, .pCont = pCreate, @@ -576,7 +574,7 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); + mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle); } } @@ -615,17 +613,17 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { free(queueMsg); } -static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(SVgObj *pVgroup) { +static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(int32_t vgId) { SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg)); if (pDrop == NULL) return NULL; - pDrop->vgId = htonl(pVgroup->vgId); + pDrop->vgId = htonl(vgId); return pDrop; } -static void mgmtSendDropVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { - mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", pVgroup->vgId, ahandle); - SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(pVgroup); +void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { + mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", vgId, ahandle); + SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(vgId); SRpcMsg rpcMsg = { .handle = ahandle, .pCont = pDrop, @@ -640,7 +638,7 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - mgmtSendDropVnodeMsg(pVgroup, &ipSet, ahandle); + mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle); } } @@ -675,4 +673,25 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { mgmtAddToShellQueue(newMsg); free(queueMsg); -} \ No newline at end of file +} + +void mgmtUpdateVgroupIp(SDnodeObj *pDnode) { + void * pNode = NULL; + SVgObj *pVgroup = NULL; + while (1) { + pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); + if (pVgroup == NULL) break; + + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SVnodeGid *vnodeGid = pVgroup->vnodeGid + i; + if (vnodeGid->dnodeId == pDnode->dnodeId) { + mPrint("vgroup:%d, dnode:%d, privateIp:%s change to %s, publicIp:%s change to %s", + pVgroup->vgId, vnodeGid->dnodeId, pDnode->privateIp, taosIpStr(vnodeGid->ip), + pDnode->publicIp, taosIpStr(vnodeGid->publicIp)); + vnodeGid->publicIp = pDnode->publicIp; + vnodeGid->ip = pDnode->privateIp; + sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1); + } + } + } +} diff --git a/src/util/inc/ihash.h b/src/util/inc/ihash.h index 1d7a8f79309dd3a5bfc2d8eee67f1b7bedb187ee..f283abe737b62cba17a246e1942c22c4a0b90123 100644 --- a/src/util/inc/ihash.h +++ b/src/util/inc/ihash.h @@ -36,7 +36,7 @@ int32_t taosHashInt(void *handle, uint64_t key); void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)); -char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)); +void taosVisitIntHashWithFp(void *handle, void (*fp)(char *, void *), void *param); int32_t taosGetIntHashSize(void *handle); diff --git a/src/util/src/ihash.c b/src/util/src/ihash.c index a61ce6654fe614ddbcd5c7720f8ec40d484161d5..6b58d8ef31c3b83c8c0160364abf2ae97132d0f0 100644 --- a/src/util/src/ihash.c +++ b/src/util/src/ihash.c @@ -187,7 +187,6 @@ void taosCleanUpIntHash(void *handle) { free(pObj); } - void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) { IHashObj * pObj; IHashNode *pNode, *pNext; @@ -202,7 +201,7 @@ void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) { pNode = pObj->hashList[i]; while (pNode) { pNext = pNode->next; - if (fp != NULL) fp(pNode->data); + if (fp != NULL) (*fp)(pNode->data); free(pNode); pNode = pNext; } @@ -219,7 +218,7 @@ void taosCleanUpIntHashWithFp(void *handle, void (*fp)(char *)) { free(pObj); } -char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)) { +void taosVisitIntHashWithFp(void *handle, int (*fp)(char *, void *), void *param) { IHashObj * pObj; IHashNode *pNode, *pNext; char * pData = NULL; @@ -234,21 +233,13 @@ char *taosVisitIntHashWithFp(void *handle, int (*fp)(char *)) { pNode = pObj->hashList[i]; while (pNode) { pNext = pNode->next; - int flag = fp(pNode->data); - if (flag) { - pData = pNode->data; - goto VisitEnd; - } - + (*fp)(pNode->data, param); pNode = pNext; } } } -VisitEnd: - pthread_mutex_unlock(&pObj->mutex); - return pData; } int32_t taosGetIntHashSize(void *handle) { diff --git a/src/util/src/tstatus.c b/src/util/src/tstatus.c index bb7fc3f0b1d0c315e377fa0d7813e693ba67b5f9..39704464e800db3be75c623eaac437e9a3fe2214 100644 --- a/src/util/src/tstatus.c +++ b/src/util/src/tstatus.c @@ -18,13 +18,13 @@ char* taosGetVgroupStatusStr(int32_t vgroupStatus) { switch (vgroupStatus) { - case TSDB_VG_STATUS_READY: return tstrerror(vgroupStatus); - case TSDB_VG_STATUS_IN_PROGRESS: return tstrerror(vgroupStatus); - case TSDB_VG_STATUS_NO_DISK_PERMISSIONS: return tstrerror(vgroupStatus); - case TSDB_VG_STATUS_SERVER_NO_PACE: return tstrerror(vgroupStatus); - case TSDB_VG_STATUS_SERV_OUT_OF_MEMORY: return tstrerror(vgroupStatus); - case TSDB_VG_STATUS_INIT_FAILED: return tstrerror(vgroupStatus); - case TSDB_VG_STATUS_FULL: return tstrerror(vgroupStatus); + case TSDB_VG_STATUS_READY: return (char*)tstrerror(vgroupStatus); + case TSDB_VG_STATUS_IN_PROGRESS: return (char*)tstrerror(vgroupStatus); + case TSDB_VG_STATUS_NO_DISK_PERMISSIONS: return (char*)tstrerror(vgroupStatus); + case TSDB_VG_STATUS_SERVER_NO_PACE: return (char*)tstrerror(vgroupStatus); + case TSDB_VG_STATUS_SERV_OUT_OF_MEMORY: return (char*)tstrerror(vgroupStatus); + case TSDB_VG_STATUS_INIT_FAILED: return (char*)tstrerror(vgroupStatus); + case TSDB_VG_STATUS_FULL: return (char*)tstrerror(vgroupStatus); default: return "undefined"; } } diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 9b7215d7da12580ce512b0314a6b92fe7bf711e6..f6e97aefb0727885ec512fc71daee28d862289b1 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -35,7 +35,6 @@ extern "C" { // --------- TSDB REPOSITORY CONFIGURATION DEFINITION typedef struct { int8_t precision; - int32_t vgId; int32_t tsdbId; int32_t maxTables; // maximum number of tables this repository can have int32_t daysPerFile; // day per file sharding policy