diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ba9147dcdd95426fcbcb2883cbcdbbd6d6cce3de..78397547353e5b51f660b3fbbc2bd0fae0f7a701 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -744,8 +744,8 @@ typedef struct { } SVnodeLoad; typedef struct { - int32_t sver; // software version - int64_t dver; // dnode table version in sdb + int32_t sver; // software version + int64_t dnodeVer; // dnode table version in sdb int32_t dnodeId; int64_t clusterId; int64_t rebootTime; @@ -772,7 +772,7 @@ typedef struct { } SDnodeEp; typedef struct { - int64_t dver; + int64_t dnodeVer; SDnodeCfg dnodeCfg; SArray* pDnodeEps; // Array of SDnodeEp } SStatusRsp; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0edfcb3314c37c0b0b21223137fa29347f3cb81f..a9ecdd659ff7a53f0544f0eb3fdd61f6cd2b9fc9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -868,7 +868,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { // status if (tEncodeI32(&encoder, pReq->sver) < 0) return -1; - if (tEncodeI64(&encoder, pReq->dver) < 0) return -1; + if (tEncodeI64(&encoder, pReq->dnodeVer) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; if (tEncodeI64(&encoder, pReq->clusterId) < 0) return -1; if (tEncodeI64(&encoder, pReq->rebootTime) < 0) return -1; @@ -913,7 +913,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { // status if (tDecodeI32(&decoder, &pReq->sver) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->dver) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->dnodeVer) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->clusterId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->rebootTime) < 0) return -1; @@ -965,7 +965,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { if (tStartEncode(&encoder) < 0) return -1; // status - if (tEncodeI64(&encoder, pRsp->dver) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->dnodeVer) < 0) return -1; // dnode cfg if (tEncodeI32(&encoder, pRsp->dnodeCfg.dnodeId) < 0) return -1; @@ -996,7 +996,7 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; // status - if (tDecodeI64(&decoder, &pRsp->dver) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->dnodeVer) < 0) return -1; // cluster cfg if (tDecodeI32(&decoder, &pRsp->dnodeCfg.dnodeId) < 0) return -1; diff --git a/source/dnode/mgmt/CMakeLists.txt b/source/dnode/mgmt/CMakeLists.txt index f4987734628d7947724d67bdd0746927dc80c8bf..40c3f5b1d3d2c6a92066e6d41f71fe2feb690c55 100644 --- a/source/dnode/mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/CMakeLists.txt @@ -1,3 +1,5 @@ +add_subdirectory(interface) + aux_source_directory(dm DNODE_SRC) aux_source_directory(qm DNODE_SRC) aux_source_directory(bm DNODE_SRC) @@ -7,19 +9,20 @@ aux_source_directory(mm DNODE_SRC) aux_source_directory(main DNODE_SRC) add_library(dnode STATIC ${DNODE_SRC}) target_link_libraries( - dnode cjson mnode vnode qnode snode bnode wal sync taos tfs monitor + dnode dnode_interface ) target_include_directories( dnode PUBLIC "${TD_SOURCE_DIR}/include/dnode/mgmt" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/interface/inc" ) aux_source_directory(exe EXEC_SRC) add_executable(taosd ${EXEC_SRC}) target_include_directories( taosd - PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/interface/inc" ) target_link_libraries(taosd dnode) diff --git a/source/dnode/mgmt/bm/bmHandle.c b/source/dnode/mgmt/bm/bmHandle.c index d11059260317d287398320902e5889a72c71a463..9b33533a8a29ca190717681e07dd6a1fe1a178bc 100644 --- a/source/dnode/mgmt/bm/bmHandle.c +++ b/source/dnode/mgmt/bm/bmHandle.c @@ -53,9 +53,9 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (createReq.dnodeId != pDnode->dnodeId) { + if (createReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId); + dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->data.dnodeId); return -1; } else { return dndOpenNode(pWrapper); @@ -72,7 +72,7 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pDnode->dnodeId) { + if (dropReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop bnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/bm/bmInt.c b/source/dnode/mgmt/bm/bmInt.c index 990c7873a98d4e5cf608f6546a1c43929d7c30ed..71ba2bab8c3cc88dd0ce002c70c0a0cbb5467fa3 100644 --- a/source/dnode/mgmt/bm/bmInt.c +++ b/source/dnode/mgmt/bm/bmInt.c @@ -113,8 +113,8 @@ void bmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = bmOpen; mgmtFp.closeFp = bmClose; - mgmtFp.createMsgFp = bmProcessCreateReq; - mgmtFp.dropMsgFp = bmProcessDropReq; + mgmtFp.createFp = bmProcessCreateReq; + mgmtFp.dropFp = bmProcessDropReq; mgmtFp.requiredFp = bmRequire; bmInitMsgHandle(pWrapper); diff --git a/source/dnode/mgmt/dm/dmFile.c b/source/dnode/mgmt/dm/dmFile.c index c1964ac8c42a39fbd2c81be5af0f4fddfb62e825..aefc28c46dcba61aa7fffefb77793f560f63c213 100644 --- a/source/dnode/mgmt/dm/dmFile.c +++ b/source/dnode/mgmt/dm/dmFile.c @@ -16,11 +16,11 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -static void dmPrintDnodes(SDnodeMgmt *pMgmt); -static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep); -static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps); +static void dmPrintDnodes(SDnodeData *pMgmt); +static bool dmIsEpChanged(SDnodeData *pMgmt, int32_t dnodeId, const char *ep); +static void dmResetDnodes(SDnodeData *pMgmt, SArray *dnodeEps); -int32_t dmReadFile(SDnodeMgmt *pMgmt) { +int32_t dmReadFile(SDnodeData *pMgmt) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 256 * 1024; @@ -62,21 +62,21 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) { dError("failed to read %s since dnodeId not found", file); goto PRASE_DNODE_OVER; } - pDnode->dnodeId = dnodeId->valueint; + pDnode->data.dnodeId = dnodeId->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", file); goto PRASE_DNODE_OVER; } - pDnode->clusterId = atoll(clusterId->valuestring); + pDnode->data.clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); goto PRASE_DNODE_OVER; } - pDnode->dropped = dropped->valueint; + pDnode->data.dropped = dropped->valueint; cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); if (!dnodes || dnodes->type != cJSON_Array) { @@ -138,15 +138,15 @@ PRASE_DNODE_OVER: if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); - if (dmIsEpChanged(pMgmt, pDnode->dnodeId, pDnode->localEp)) { - dError("localEp %s different with %s and need reconfigured", pDnode->localEp, file); + if (dmIsEpChanged(pMgmt, pDnode->data.dnodeId, pDnode->data.localEp)) { + dError("localEp %s different with %s and need reconfigured", pDnode->data.localEp, file); return -1; } if (taosArrayGetSize(pMgmt->dnodeEps) == 0) { SDnodeEp dnodeEp = {0}; dnodeEp.isMnode = 1; - taosGetFqdnPortFromEp(pDnode->firstEp, &dnodeEp.ep); + taosGetFqdnPortFromEp(pDnode->data.firstEp, &dnodeEp.ep); taosArrayPush(pMgmt->dnodeEps, &dnodeEp); } @@ -156,7 +156,7 @@ PRASE_DNODE_OVER: return code; } -int32_t dmWriteFile(SDnodeMgmt *pMgmt) { +int32_t dmWriteFile(SDnodeData *pMgmt) { SDnode *pDnode = pMgmt->pDnode; char file[PATH_MAX]; @@ -174,9 +174,9 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) { char *content = taosMemoryCalloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pDnode->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pDnode->dropped); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pDnode->data.dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->data.clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pDnode->data.dropped); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->dnodeEps); @@ -213,7 +213,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) { return 0; } -void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) { +void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *dnodeEps) { int32_t numOfEps = taosArrayGetSize(dnodeEps); if (numOfEps <= 0) return; @@ -234,7 +234,7 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) { taosWUnLockLatch(&pMgmt->latch); } -static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps) { +static void dmResetDnodes(SDnodeData *pMgmt, SArray *dnodeEps) { if (pMgmt->dnodeEps != dnodeEps) { SArray *tmp = pMgmt->dnodeEps; pMgmt->dnodeEps = taosArrayDup(dnodeEps); @@ -265,7 +265,7 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps) { dmPrintDnodes(pMgmt); } -static void dmPrintDnodes(SDnodeMgmt *pMgmt) { +static void dmPrintDnodes(SDnodeData *pMgmt) { int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->dnodeEps); dDebug("print dnode ep list, num:%d", numOfEps); for (int32_t i = 0; i < numOfEps; i++) { @@ -274,7 +274,7 @@ static void dmPrintDnodes(SDnodeMgmt *pMgmt) { } } -static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) { +static bool dmIsEpChanged(SDnodeData *pMgmt, int32_t dnodeId, const char *ep) { bool changed = false; taosRLockLatch(&pMgmt->latch); diff --git a/source/dnode/mgmt/dm/dmHandle.c b/source/dnode/mgmt/dm/dmHandle.c index 46d94fb8449bfd9ef8d6a8c77f92779832c4c91e..36edc089f09e97dbfd78c40fd610a1fb83aac20e 100644 --- a/source/dnode/mgmt/dm/dmHandle.c +++ b/source/dnode/mgmt/dm/dmHandle.c @@ -16,20 +16,20 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -void dmSendStatusReq(SDnodeMgmt *pMgmt) { +void dmSendStatusReq(SDnodeData *pMgmt) { SDnode *pDnode = pMgmt->pDnode; SStatusReq req = {0}; taosRLockLatch(&pMgmt->latch); req.sver = tsVersion; - req.dver = pMgmt->dver; - req.dnodeId = pDnode->dnodeId; - req.clusterId = pDnode->clusterId; - req.rebootTime = pDnode->rebootTime; + req.dnodeVer = pMgmt->dnodeVer; + req.dnodeId = pDnode->data.dnodeId; + req.clusterId = pDnode->data.clusterId; + req.rebootTime = pDnode->data.rebootTime; req.updateTime = pMgmt->updateTime; req.numOfCores = tsNumOfCores; - req.numOfSupportVnodes = pDnode->numOfSupportVnodes; - tstrncpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN); + req.numOfSupportVnodes = pDnode->data.supportVnodes; + tstrncpy(req.dnodeEp, pDnode->data.localEp, TSDB_EP_LEN); req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.checkTime = 0; @@ -40,7 +40,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); taosRUnLockLatch(&pMgmt->latch); - SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES); + SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODE); if (pWrapper != NULL) { SMonVloadInfo info = {0}; dmGetVnodeLoads(pWrapper, &info); @@ -62,34 +62,34 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { tmsgSendReq(&pMgmt->msgCb, &epSet, &rpcMsg); } -static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { +static void dmUpdateDnodeCfg(SDnodeData *pMgmt, SDnodeCfg *pCfg) { SDnode *pDnode = pMgmt->pDnode; - if (pDnode->dnodeId == 0) { + if (pDnode->data.dnodeId == 0) { dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); - pDnode->dnodeId = pCfg->dnodeId; - pDnode->clusterId = pCfg->clusterId; + pDnode->data.dnodeId = pCfg->dnodeId; + pDnode->data.clusterId = pCfg->clusterId; dmWriteFile(pMgmt); taosWUnLockLatch(&pMgmt->latch); } } -int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg) { SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRsp = &pMsg->rpcMsg; if (pRsp->code != TSDB_CODE_SUCCESS) { - if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->dropped && pDnode->dnodeId > 0) { - dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->dnodeId); - pDnode->dropped = 1; + if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->data.dropped && pDnode->data.dnodeId > 0) { + dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->data.dnodeId); + pDnode->data.dropped = 1; dmWriteFile(pMgmt); } } else { SStatusRsp statusRsp = {0}; if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { - pMgmt->dver = statusRsp.dver; + pMgmt->dnodeVer = statusRsp.dnodeVer; dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps); } @@ -100,26 +100,26 @@ int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { return TSDB_CODE_SUCCESS; } -int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pRsp = &pMsg->rpcMsg; dError("auth rsp is received, but not supported yet"); return 0; } -int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pRsp = &pMsg->rpcMsg; dError("grant rsp is received, but not supported yet"); return 0; } -int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SDCfgDnodeReq *pCfg = pReq->pCont; dError("config req is received, but not supported yet"); return TSDB_CODE_OPS_NOT_SUPPORT; } -static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); if (pWrapper != NULL) { dndReleaseWrapper(pWrapper); @@ -136,7 +136,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg * return -1; } - int32_t code = (*pWrapper->fp.createMsgFp)(pWrapper, pMsg); + int32_t code = (*pWrapper->fp.createFp)(pWrapper, pMsg); if (code != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); } else { @@ -147,7 +147,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg * return code; } -static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; @@ -158,7 +158,7 @@ static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pM taosWLockLatch(&pWrapper->latch); pWrapper->deployed = false; - int32_t code = (*pWrapper->fp.dropMsgFp)(pWrapper, pMsg); + int32_t code = (*pWrapper->fp.dropFp)(pWrapper, pMsg); if (code != 0) { pWrapper->deployed = true; dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); diff --git a/source/dnode/mgmt/dm/dmInt.c b/source/dnode/mgmt/dm/dmInt.c index c710af9006feefd5284f8106ac79af8261ddf818..8151aa8d1e1378557edefeec36e05878956568e8 100644 --- a/source/dnode/mgmt/dm/dmInt.c +++ b/source/dnode/mgmt/dm/dmInt.c @@ -16,13 +16,13 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) { +void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet) { taosRLockLatch(&pMgmt->latch); *pEpSet = pMgmt->mnodeEpSet; taosRUnLockLatch(&pMgmt->latch); } -void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) { +void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet) { dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); taosWLockLatch(&pMgmt->latch); @@ -35,7 +35,7 @@ void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) { } void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; + SDnodeData *pMgmt = pWrapper->pMgmt; taosRLockLatch(&pMgmt->latch); SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); @@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd taosRUnLockLatch(&pMgmt->latch); } -void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pReq) { +void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pReq) { SDnode *pDnode = pMgmt->pDnode; SEpSet epSet = {0}; @@ -63,7 +63,7 @@ void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pReq) { dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, pDnode->localFqdn) == 0 && epSet.eps[i].port == pDnode->serverPort) { + if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) { epSet.inUse = (i + 1) % epSet.numOfEps; } @@ -80,15 +80,14 @@ static int32_t dmStart(SMgmtWrapper *pWrapper) { static int32_t dmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; - SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt)); + SDnodeData *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeData)); dInfo("dnode-mgmt start to init"); - pDnode->dnodeId = 0; - pDnode->dropped = 0; - pDnode->clusterId = 0; + pDnode->data.dnodeId = 0; + pDnode->data.dropped = 0; + pDnode->data.clusterId = 0; pMgmt->path = pWrapper->path; pMgmt->pDnode = pDnode; - pMgmt->pWrapper = pWrapper; taosInitRWLatch(&pMgmt->latch); pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -103,7 +102,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) { return -1; } - if (pDnode->dropped) { + if (pDnode->data.dropped) { dError("dnode will not start since its already dropped"); return -1; } @@ -125,7 +124,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) { } static void dmCleanup(SMgmtWrapper *pWrapper) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; + SDnodeData *pMgmt = pWrapper->pMgmt; if (pMgmt == NULL) return; dInfo("dnode-mgmt start to clean up"); diff --git a/source/dnode/mgmt/dm/dmMonitor.c b/source/dnode/mgmt/dm/dmMonitor.c index 6edf85106bcbfcc5f8d0cc6e2449ea50da2b56c4..136158e10fb8e18f1cf13db8f202772dfbd01c38 100644 --- a/source/dnode/mgmt/dm/dmMonitor.c +++ b/source/dnode/mgmt/dm/dmMonitor.c @@ -18,13 +18,13 @@ static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { pInfo->protocol = 1; - pInfo->dnode_id = pDnode->dnodeId; - pInfo->cluster_id = pDnode->clusterId; + pInfo->dnode_id = pDnode->data.dnodeId; + pInfo->cluster_id = pDnode->data.clusterId; tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); } static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { - pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f); + pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f); pInfo->has_mnode = pDnode->wrappers[MNODE].required; pInfo->has_qnode = pDnode->wrappers[QNODE].required; pInfo->has_snode = pDnode->wrappers[SNODE].required; @@ -79,7 +79,7 @@ void dmSendMonitorReport(SDnode *pDnode) { } } - pWrapper = &pDnode->wrappers[VNODES]; + pWrapper = &pDnode->wrappers[VNODE]; if (getFromAPI) { if (dndMarkWrapper(pWrapper) == 0) { vmGetMonitorInfo(pWrapper, &vmInfo); diff --git a/source/dnode/mgmt/dm/dmWorker.c b/source/dnode/mgmt/dm/dmWorker.c index 7009469c00dae45ec100f08718bdd22239edbeba..6886c56fabc5947b6324db91e6a3ae4b18860878 100644 --- a/source/dnode/mgmt/dm/dmWorker.c +++ b/source/dnode/mgmt/dm/dmWorker.c @@ -17,7 +17,7 @@ #include "dmInt.h" static void *dmThreadRoutine(void *param) { - SDnodeMgmt *pMgmt = param; + SDnodeData *pMgmt = param; SDnode *pDnode = pMgmt->pDnode; int64_t lastStatusTime = taosGetTimestampMs(); int64_t lastMonitorTime = lastStatusTime; @@ -27,7 +27,7 @@ static void *dmThreadRoutine(void *param) { while (true) { taosThreadTestCancel(); taosMsleep(200); - if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->dropped) { + if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->data.dropped) { continue; } @@ -47,7 +47,7 @@ static void *dmThreadRoutine(void *param) { return TSDB_CODE_SUCCESS; } -int32_t dmStartThread(SDnodeMgmt *pMgmt) { +int32_t dmStartThread(SDnodeData *pMgmt) { pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt); if (pMgmt->threadId == NULL) { dError("failed to init dnode thread"); @@ -59,7 +59,7 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) { } static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { - SDnodeMgmt *pMgmt = pInfo->ahandle; + SDnodeData *pMgmt = pInfo->ahandle; SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; @@ -95,7 +95,7 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -int32_t dmStartWorker(SDnodeMgmt *pMgmt) { +int32_t dmStartWorker(SDnodeData *pMgmt) { SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); @@ -112,7 +112,7 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) { return 0; } -void dmStopWorker(SDnodeMgmt *pMgmt) { +void dmStopWorker(SDnodeData *pMgmt) { tSingleWorkerCleanup(&pMgmt->mgmtWorker); tSingleWorkerCleanup(&pMgmt->monitorWorker); @@ -124,7 +124,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; + SDnodeData *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); @@ -133,7 +133,7 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; + SDnodeData *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); diff --git a/source/dnode/mgmt/exe/dndMain.c b/source/dnode/mgmt/exe/dndMain.c index 997c56f9fb10f63b67bf858de4b76a1efedaaa4c..3779f94620118f8dcb8dd592dc55bf1432659407 100644 --- a/source/dnode/mgmt/exe/dndMain.c +++ b/source/dnode/mgmt/exe/dndMain.c @@ -26,7 +26,7 @@ static struct { char apolloUrl[PATH_MAX]; SArray *pArgs; // SConfigPair SDnode *pDnode; - EDndType ntype; + EDndNodeType ntype; } global = {0}; static void dndStopDnode(int signum, void *info, void *ctx) { @@ -46,7 +46,7 @@ static void dndSetSignalHandle() { taosSetSignal(SIGQUIT, dndStopDnode); if (!tsMultiProcess) { - } else if (global.ntype == DNODE || global.ntype == NODE_MAX) { + } else if (global.ntype == NODE_BEGIN || global.ntype == NODE_END) { taosIgnSignal(SIGCHLD); } else { taosKillChildOnParentStopped(); @@ -72,8 +72,8 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) { tstrncpy(global.envFile, argv[++i], PATH_MAX); } else if (strcmp(argv[i], "-n") == 0) { global.ntype = atoi(argv[++i]); - if (global.ntype <= DNODE || global.ntype > NODE_MAX) { - printf("'-n' range is [1 - %d], default is 0\n", NODE_MAX - 1); + if (global.ntype <= NODE_BEGIN || global.ntype > NODE_END) { + printf("'-n' range is [1 - %d], default is 0\n", NODE_END - 1); return -1; } } else if (strcmp(argv[i], "-k") == 0) { @@ -135,7 +135,7 @@ static int32_t dndInitLog() { static void dndSetProcInfo(int32_t argc, char **argv) { taosSetProcPath(argc, argv); - if (global.ntype != DNODE && global.ntype != NODE_MAX) { + if (global.ntype != NODE_BEGIN && global.ntype != NODE_END) { const char *name = dndNodeProcStr(global.ntype); taosSetProcName(argc, argv, name); } diff --git a/source/dnode/mgmt/inc/dmInt.h b/source/dnode/mgmt/inc/dmInt.h index a671368f06beff478086755db1b4ea8888fa3e2d..4d6ebd63d2dc7c1572455a758cab8f5407ed3df2 100644 --- a/source/dnode/mgmt/inc/dmInt.h +++ b/source/dnode/mgmt/inc/dmInt.h @@ -22,35 +22,20 @@ extern "C" { #endif -typedef struct SDnodeMgmt { - int64_t dver; - int64_t updateTime; - int8_t statusSent; - SEpSet mnodeEpSet; - SHashObj *dnodeHash; - SArray *dnodeEps; - TdThread *threadId; - SRWLatch latch; - SSingleWorker mgmtWorker; - SSingleWorker monitorWorker; - SMsgCb msgCb; - const char *path; - SDnode *pDnode; - SMgmtWrapper *pWrapper; -} SDnodeMgmt; + // dmFile.c -int32_t dmReadFile(SDnodeMgmt *pMgmt); -int32_t dmWriteFile(SDnodeMgmt *pMgmt); -void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps); +int32_t dmReadFile(SDnodeData *pMgmt); +int32_t dmWriteFile(SDnodeData *pMgmt); +void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *pDnodeEps); // dmHandle.c void dmInitMsgHandle(SMgmtWrapper *pWrapper); -void dmSendStatusReq(SDnodeMgmt *pMgmt); -int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +void dmSendStatusReq(SDnodeData *pMgmt); +int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); // dmMonitor.c @@ -58,9 +43,9 @@ void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); void dmSendMonitorReport(SDnode *pDnode); // dmWorker.c -int32_t dmStartThread(SDnodeMgmt *pMgmt); -int32_t dmStartWorker(SDnodeMgmt *pMgmt); -void dmStopWorker(SDnodeMgmt *pMgmt); +int32_t dmStartThread(SDnodeData *pMgmt); +int32_t dmStartWorker(SDnodeData *pMgmt); +void dmStopWorker(SDnodeData *pMgmt); int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/inc/dndInt.h b/source/dnode/mgmt/inc/dndInt.h deleted file mode 100644 index a38fe87b59ea93015eb78acb40159072efb3e1e0..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/inc/dndInt.h +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DND_INT_H_ -#define _TD_DND_INT_H_ - -#include "os.h" - -#include "cJSON.h" -#include "tcache.h" -#include "tcrc32c.h" -#include "tdatablock.h" -#include "tglobal.h" -#include "thash.h" -#include "tlockfree.h" -#include "tlog.h" -#include "tmsg.h" -#include "tmsgcb.h" -#include "tprocess.h" -#include "tqueue.h" -#include "trpc.h" -#include "tthread.h" -#include "ttime.h" -#include "tworker.h" - -#include "dnode.h" -#include "monitor.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} -#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} -#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} -#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} -#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} -#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} - -typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } EDndType; -typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; -typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus; -typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; - -typedef struct SMgmtFp SMgmtFp; -typedef struct SMgmtWrapper SMgmtWrapper; -typedef struct SMsgHandle SMsgHandle; -typedef struct SDnodeMgmt SDnodeMgmt; -typedef struct SVnodesMgmt SVnodesMgmt; -typedef struct SMnodeMgmt SMnodeMgmt; -typedef struct SQnodeMgmt SQnodeMgmt; -typedef struct SSnodeMgmt SSnodeMgmt; -typedef struct SBnodeMgmt SBnodeMgmt; - -typedef int32_t (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); -typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); -typedef int32_t (*StartNodeFp)(SMgmtWrapper *pWrapper); -typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required); - -typedef struct SMsgHandle { - SMgmtWrapper *pQndWrapper; - SMgmtWrapper *pMndWrapper; - SMgmtWrapper *pWrapper; -} SMsgHandle; - -typedef struct SMgmtFp { - OpenNodeFp openFp; - CloseNodeFp closeFp; - StartNodeFp startFp; - CreateNodeFp createMsgFp; - DropNodeFp dropMsgFp; - RequireNodeFp requiredFp; -} SMgmtFp; - -typedef struct SMgmtWrapper { - const char *name; - char *path; - int32_t refCount; - SRWLatch latch; - EDndType ntype; - bool deployed; - bool required; - EProcType procType; - int32_t procId; - SProcObj *pProc; - SShm shm; - void *pMgmt; - SDnode *pDnode; - SMgmtFp fp; - int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode - NodeMsgFp msgFps[TDMT_MAX]; -} SMgmtWrapper; - -typedef struct { - void *serverRpc; - void *clientRpc; - SMsgHandle msgHandles[TDMT_MAX]; -} STransMgmt; - -typedef struct SDnode { - int64_t clusterId; - int32_t dnodeId; - int32_t numOfSupportVnodes; - int64_t rebootTime; - char *localEp; - char *localFqdn; - char *firstEp; - char *secondEp; - char *dataDir; - SDiskCfg *disks; - int32_t numOfDisks; - uint16_t serverPort; - bool dropped; - EProcType procType; - EDndType ntype; - EDndStatus status; - EDndEvent event; - SStartupReq startup; - TdFilePtr lockfile; - STransMgmt trans; - SMgmtWrapper wrappers[NODE_MAX]; -} SDnode; - -// dndEnv.c -const char *dndStatStr(EDndStatus stat); -const char *dndNodeLogStr(EDndType ntype); -const char *dndNodeProcStr(EDndType ntype); -const char *dndEventStr(EDndEvent ev); - -// dndExec.c -int32_t dndOpenNode(SMgmtWrapper *pWrapper); -void dndCloseNode(SMgmtWrapper *pWrapper); - -// dndFile.c -int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); -int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); -TdFilePtr dndCheckRunning(const char *dataDir); -int32_t dndReadShmFile(SDnode *pDnode); -int32_t dndWriteShmFile(SDnode *pDnode); - -// dndInt.c -EDndStatus dndGetStatus(SDnode *pDnode); -void dndSetStatus(SDnode *pDnode, EDndStatus stat); -void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType nType); -int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); -void dndReleaseWrapper(SMgmtWrapper *pWrapper); -void dndHandleEvent(SDnode *pDnode, EDndEvent event); -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); -void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); - -// dndTransport.c -int32_t dndInitTrans(SDnode *pDnode); -void dndCleanupTrans(SDnode *pDnode); -SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); -SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); -int32_t dndInitMsgHandle(SDnode *pDnode); -void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); - -// mgmt -void dmSetMgmtFp(SMgmtWrapper *pWrapper); -void bmSetMgmtFp(SMgmtWrapper *pWrapper); -void qmSetMgmtFp(SMgmtWrapper *pMgmt); -void smSetMgmtFp(SMgmtWrapper *pWrapper); -void vmSetMgmtFp(SMgmtWrapper *pWrapper); -void mmSetMgmtFp(SMgmtWrapper *pMgmt); - -void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); -void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); -void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg); - -void dmGetMonitorSysInfo(SMonSysInfo *pInfo); -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); -void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo); -void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo); -void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo); -void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo); -void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DND_INT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/interface/CMakeLists.txt b/source/dnode/mgmt/interface/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..78f6ef03e5e4156354299092209e52c0ca896166 --- /dev/null +++ b/source/dnode/mgmt/interface/CMakeLists.txt @@ -0,0 +1,10 @@ +aux_source_directory(src DNODE_INTERFACE) +add_library(dnode_interface STATIC ${DNODE_INTERFACE}) +target_include_directories( + dnode_interface + PUBLIC "${TD_SOURCE_DIR}/include/dnode/mgmt" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) +target_link_libraries( + dnode_interface cjson mnode vnode qnode snode bnode wal sync taos tfs monitor util +) \ No newline at end of file diff --git a/source/dnode/mgmt/interface/inc/dndDef.h b/source/dnode/mgmt/interface/inc/dndDef.h new file mode 100644 index 0000000000000000000000000000000000000000..0ce5a64ed0ad9b4a0daa28c7522194f4f4c81536 --- /dev/null +++ b/source/dnode/mgmt/interface/inc/dndDef.h @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_DEF_H_ +#define _TD_DND_DEF_H_ + +#include "dndLog.h" + +#include "cJSON.h" +#include "tcache.h" +#include "tcrc32c.h" +#include "tdatablock.h" +#include "tglobal.h" +#include "thash.h" +#include "tlockfree.h" +#include "tlog.h" +#include "tmsg.h" +#include "tmsgcb.h" +#include "tprocess.h" +#include "tqueue.h" +#include "trpc.h" +#include "tthread.h" +#include "ttime.h" +#include "tworker.h" + +#include "dnode.h" +#include "monitor.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { NODE_BEGIN, VNODE, QNODE, SNODE, MNODE, BNODE, NODE_END } EDndNodeType; +typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndRunStatus; +typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EDndEnvStatus; +typedef enum { DND_PROC_SINGLE, DND_PROC_CHILD, DND_PROC_PARENT } EDndProcType; + +typedef int32_t (*NodeMsgFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +typedef int32_t (*OpenNodeFp)(struct SMgmtWrapper *pWrapper); +typedef void (*CloseNodeFp)(struct SMgmtWrapper *pWrapper); +typedef int32_t (*StartNodeFp)(struct SMgmtWrapper *pWrapper); +typedef void (*StopNodeFp)(struct SMgmtWrapper *pWrapper); +typedef int32_t (*CreateNodeFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +typedef int32_t (*DropNodeFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +typedef int32_t (*RequireNodeFp)(struct SMgmtWrapper *pWrapper, bool *required); + +typedef struct { + SMgmtWrapper *pQndWrapper; + SMgmtWrapper *pMndWrapper; + SMgmtWrapper *pNdWrapper; +} SMsgHandle; + +typedef struct { + OpenNodeFp openFp; + CloseNodeFp closeFp; + StartNodeFp startFp; + StopNodeFp stopFp; + CreateNodeFp createFp; + DropNodeFp dropFp; + RequireNodeFp requiredFp; +} SMgmtFp; + +typedef struct SMgmtWrapper { + SDnode *pDnode; + struct { + const char *name; + char *path; + int32_t refCount; + SRWLatch latch; + EDndNodeType ntype; + bool deployed; + bool required; + SMgmtFp fp; + void *pMgmt; + }; + struct { + EDndProcType procType; + int32_t procId; + SProcObj *procObj; + SShm procShm; + }; + struct { + int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode + NodeMsgFp msgFps[TDMT_MAX]; + }; +} SMgmtWrapper; + +typedef struct { + void *serverRpc; + void *clientRpc; + SMsgHandle msgHandles[TDMT_MAX]; +} SDnodeTrans; + +typedef struct { + int32_t dnodeId; + int64_t clusterId; + int64_t dnodeVer; + int64_t updateTime; + int64_t rebootTime; + bool dropped; + int8_t statusSent; + SEpSet mnodeEpSet; + SHashObj *dnodeHash; + SArray *dnodeEps; + TdThread *threadId; + SRWLatch latch; + SSingleWorker mgmtWorker; + SSingleWorker monitorWorker; + SMsgCb msgCb; + SDnode *pDnode; + const char *path; + TdFilePtr lockfile; + struct { + char *localEp; + char *localFqdn; + char *firstEp; + char *secondEp; + char *dataDir; + SDiskCfg *disks; + int32_t numOfDisks; + int32_t supportVnodes; + uint16_t serverPort; + }; +} SDnodeData; + +typedef struct SDnode { + EDndProcType ptype; + EDndNodeType ntype; + EDndRunStatus status; + EDndEvent event; + SStartupReq startup; + SDnodeTrans trans; + SDnodeData data; + SMgmtWrapper wrappers[NODE_END]; +} SDnode; + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_DEF_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/interface/inc/dndInt.h b/source/dnode/mgmt/interface/inc/dndInt.h new file mode 100644 index 0000000000000000000000000000000000000000..b08f372daa9c785b916ae410fe2499d4a50c79e2 --- /dev/null +++ b/source/dnode/mgmt/interface/inc/dndInt.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_INT_H_ +#define _TD_DND_INT_H_ + +#include "dndLog.h" +#include "dndDef.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// dndEnv.c +const char *dndStatStr(EDndRunStatus stat); +const char *dndNodeLogStr(EDndNodeType ntype); +const char *dndNodeProcStr(EDndNodeType ntype); +const char *dndEventStr(EDndEvent ev); + +// dndExec.c +int32_t dndOpenNode(SMgmtWrapper *pWrapper); +void dndCloseNode(SMgmtWrapper *pWrapper); + +// dndFile.c +int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); +int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); +TdFilePtr dndCheckRunning(const char *dataDir); +int32_t dndReadShmFile(SDnode *pDnode); +int32_t dndWriteShmFile(SDnode *pDnode); + +// dndInt.c +EDndRunStatus dndGetStatus(SDnode *pDnode); +void dndSetStatus(SDnode *pDnode, EDndRunStatus stat); +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType nType); +int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); +void dndReleaseWrapper(SMgmtWrapper *pWrapper); +void dndHandleEvent(SDnode *pDnode, EDndEvent event); +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); + +// dndTransport.c +int32_t dndInitTrans(SDnode *pDnode); +void dndCleanupTrans(SDnode *pDnode); +SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); +SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); +int32_t dndInitMsgHandle(SDnode *pDnode); +void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); + +// mgmt +void dmSetMgmtFp(SMgmtWrapper *pWrapper); +void bmSetMgmtFp(SMgmtWrapper *pWrapper); +void qmSetMgmtFp(SMgmtWrapper *pMgmt); +void smSetMgmtFp(SMgmtWrapper *pWrapper); +void vmSetMgmtFp(SMgmtWrapper *pWrapper); +void mmSetMgmtFp(SMgmtWrapper *pMgmt); + +void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet); +void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet); +void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pMsg); + +void dmGetMonitorSysInfo(SMonSysInfo *pInfo); +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); +void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo); +void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo); +void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo); +void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo); +void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_INT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/interface/inc/dndLog.h b/source/dnode/mgmt/interface/inc/dndLog.h new file mode 100644 index 0000000000000000000000000000000000000000..78d24e0ef5c0f0145c2febbf62d65a6ab9ed105c --- /dev/null +++ b/source/dnode/mgmt/interface/inc/dndLog.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_LOG_H_ +#define _TD_DND_LOG_H_ + +#include "tlog.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} +#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} +#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} +#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} +#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} +#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_LOG_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/interface/src/dndInt.c b/source/dnode/mgmt/interface/src/dndInt.c new file mode 100644 index 0000000000000000000000000000000000000000..9ec37b5d04a6b948bbc3f271befd65bb1f544424 --- /dev/null +++ b/source/dnode/mgmt/interface/src/dndInt.c @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndInt.h" + +EDndRunStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } + +void dndSetStatus(SDnode *pDnode, EDndRunStatus status) { + if (pDnode->status != status) { + dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status)); + pDnode->status = status; + } +} + +const char *dndStatStr(EDndRunStatus status) { + switch (status) { + case DND_STAT_INIT: + return "init"; + case DND_STAT_RUNNING: + return "running"; + case DND_STAT_STOPPED: + return "stopped"; + default: + return "UNKNOWN"; + } +} + +const char *dndNodeLogStr(EDndNodeType ntype) { + switch (ntype) { + case VNODE: + return "vnode"; + case QNODE: + return "qnode"; + case SNODE: + return "snode"; + case MNODE: + return "mnode"; + case BNODE: + return "bnode"; + default: + return "taosd"; + } +} + +const char *dndNodeProcStr(EDndNodeType ntype) { + switch (ntype) { + case VNODE: + return "taosv"; + case QNODE: + return "taosq"; + case SNODE: + return "taoss"; + case MNODE: + return "taosm"; + case BNODE: + return "taosb"; + default: + return "taosd"; + } +} + +const char *dndEventStr(EDndEvent ev) { + switch (ev) { + case DND_EVENT_START: + return "start"; + case DND_EVENT_STOP: + return "stop"; + case DND_EVENT_CHILD: + return "child"; + default: + return "UNKNOWN"; + } +} \ No newline at end of file diff --git a/source/dnode/mgmt/main/dndEnv.c b/source/dnode/mgmt/main/dndEnv.c index 3c3f2144ab26c6c395aa38219bbfe700700ebced..9f75594335021553f2dd03b511c8f925c2238286 100644 --- a/source/dnode/mgmt/main/dndEnv.c +++ b/source/dnode/mgmt/main/dndEnv.c @@ -57,63 +57,3 @@ void dndCleanup() { taosStopCacheRefreshWorker(); dInfo("dnode env is cleaned up"); } - -const char *dndStatStr(EDndStatus status) { - switch (status) { - case DND_STAT_INIT: - return "init"; - case DND_STAT_RUNNING: - return "running"; - case DND_STAT_STOPPED: - return "stopped"; - default: - return "UNKNOWN"; - } -} - -const char *dndNodeLogStr(EDndType ntype) { - switch (ntype) { - case VNODES: - return "vnode"; - case QNODE: - return "qnode"; - case SNODE: - return "snode"; - case MNODE: - return "mnode"; - case BNODE: - return "bnode"; - default: - return "taosd"; - } -} - -const char *dndNodeProcStr(EDndType ntype) { - switch (ntype) { - case VNODES: - return "taosv"; - case QNODE: - return "taosq"; - case SNODE: - return "taoss"; - case MNODE: - return "taosm"; - case BNODE: - return "taosb"; - default: - return "taosd"; - } -} - -const char *dndEventStr(EDndEvent ev) { - switch (ev) { - case DND_EVENT_START: - return "start"; - case DND_EVENT_STOP: - return "stop"; - case DND_EVENT_CHILD: - return "child"; - default: - return "UNKNOWN"; - } -} \ No newline at end of file diff --git a/source/dnode/mgmt/main/dndExec.c b/source/dnode/mgmt/main/dndExec.c index 51569d2ed4640e995f7c33e5e38bf69cec11be97..bdc9a2751aad056484502e3aff3908abfee9e36e 100644 --- a/source/dnode/mgmt/main/dndExec.c +++ b/source/dnode/mgmt/main/dndExec.c @@ -29,7 +29,7 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) { static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) { int32_t shmsize = tsMnodeShmSize; - if (pWrapper->ntype == VNODES) { + if (pWrapper->ntype == VNODE) { shmsize = tsVnodeShmSize; } else if (pWrapper->ntype == QNODE) { shmsize = tsQnodeShmSize; @@ -43,18 +43,18 @@ static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) { return -1; } - if (taosCreateShm(&pWrapper->shm, pWrapper->ntype, shmsize) != 0) { + if (taosCreateShm(&pWrapper->procShm, pWrapper->ntype, shmsize) != 0) { terrno = TAOS_SYSTEM_ERROR(terrno); dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr()); return -1; } - dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize); + dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->procShm.id, shmsize); SProcCfg cfg = dndGenProcCfg(pWrapper); cfg.isChild = false; - pWrapper->procType = PROC_PARENT; - pWrapper->pProc = taosProcInit(&cfg); - if (pWrapper->pProc == NULL) { + pWrapper->procType = DND_PROC_PARENT; + pWrapper->procObj = taosProcInit(&cfg); + if (pWrapper->procObj == NULL) { dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); return -1; } @@ -62,7 +62,7 @@ static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) { return 0; } -static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndType n) { +static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType n) { char tstr[8] = {0}; char *args[6] = {0}; snprintf(tstr, sizeof(tstr), "%d", n); @@ -85,7 +85,7 @@ static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndType n) { } static int32_t dndRunNodeProc(SMgmtWrapper *pWrapper) { - if (pWrapper->pDnode->ntype == NODE_MAX) { + if (pWrapper->pDnode->ntype == NODE_END) { dInfo("node:%s, should be started manually", pWrapper->name); } else { if (dndNewNodeProc(pWrapper, pWrapper->ntype) != 0) { @@ -93,7 +93,7 @@ static int32_t dndRunNodeProc(SMgmtWrapper *pWrapper) { } } - if (taosProcRun(pWrapper->pProc) != 0) { + if (taosProcRun(pWrapper->procObj) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; } @@ -120,9 +120,9 @@ static int32_t dndOpenNodeImp(SMgmtWrapper *pWrapper) { int32_t dndOpenNode(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; - if (pDnode->procType == PROC_SINGLE) { + if (pDnode->ptype == DND_PROC_SINGLE) { return dndOpenNodeImp(pWrapper); - } else if (pDnode->procType == PROC_PARENT) { + } else if (pDnode->ptype == DND_PROC_PARENT) { if (dndInitNodeProc(pWrapper) != 0) return -1; if (dndWriteShmFile(pDnode) != 0) return -1; if (dndRunNodeProc(pWrapper) != 0) return -1; @@ -144,15 +144,15 @@ static void dndCloseNodeImp(SMgmtWrapper *pWrapper) { taosMsleep(10); } - if (pWrapper->pProc) { - taosProcCleanup(pWrapper->pProc); - pWrapper->pProc = NULL; + if (pWrapper->procObj) { + taosProcCleanup(pWrapper->procObj); + pWrapper->procObj = NULL; } dDebug("node:%s, mgmt has been closed", pWrapper->name); } void dndCloseNode(SMgmtWrapper *pWrapper) { - if (pWrapper->pDnode->procType == PROC_PARENT) { + if (pWrapper->pDnode->ptype == DND_PROC_PARENT) { if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); taosKillProc(pWrapper->procId); @@ -172,9 +172,9 @@ static void dndProcessProcHandle(void *handle) { static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("dnode run in single process"); - pDnode->procType = PROC_SINGLE; + pDnode->ptype = DND_PROC_SINGLE; - for (EDndType n = DNODE; n < NODE_MAX; ++n) { + for (EDndNodeType n = NODE_BEGIN; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; @@ -187,7 +187,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { dndSetStatus(pDnode, DND_STAT_RUNNING); - for (EDndType n = 0; n < NODE_MAX; ++n) { + for (EDndNodeType n = 0; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; if (pWrapper->fp.startFp == NULL) continue; @@ -213,15 +213,15 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { static int32_t dndRunInParentProcess(SDnode *pDnode) { dInfo("dnode run in parent process"); - pDnode->procType = PROC_PARENT; + pDnode->ptype = DND_PROC_PARENT; - SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE]; + SMgmtWrapper *pDWrapper = &pDnode->wrappers[NODE_BEGIN]; if (dndOpenNodeImp(pDWrapper) != 0) { dError("node:%s, failed to start since %s", pDWrapper->name, terrstr()); return -1; } - for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; @@ -233,7 +233,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { return -1; } - for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; if (dndRunNodeProc(pWrapper) != 0) return -1; @@ -254,10 +254,10 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { dInfo("dnode is about to stop"); dndSetStatus(pDnode, DND_STAT_STOPPED); - for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; - if (pDnode->ntype == NODE_MAX) continue; + if (pDnode->ntype == NODE_END) continue; if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); @@ -269,14 +269,14 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { } break; } else { - for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; - if (pDnode->ntype == NODE_MAX) continue; + if (pDnode->ntype == NODE_END) continue; if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); - taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle); + taosProcCloseHandles(pWrapper->procObj, dndProcessProcHandle); dndNewNodeProc(pWrapper, n); } } @@ -291,7 +291,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { static int32_t dndRunInChildProcess(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; dInfo("%s run in child process", pWrapper->name); - pDnode->procType = PROC_CHILD; + pDnode->ptype = DND_PROC_CHILD; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) { @@ -301,7 +301,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { SMsgCb msgCb = dndCreateMsgcb(pWrapper); tmsgSetDefaultMsgCb(&msgCb); - pWrapper->procType = PROC_CHILD; + pWrapper->procType = DND_PROC_CHILD; if (dndOpenNodeImp(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); @@ -310,8 +310,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { SProcCfg cfg = dndGenProcCfg(pWrapper); cfg.isChild = true; - pWrapper->pProc = taosProcInit(&cfg); - if (pWrapper->pProc == NULL) { + pWrapper->procObj = taosProcInit(&cfg); + if (pWrapper->procObj == NULL) { dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); return -1; } @@ -325,7 +325,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { dndSetStatus(pDnode, DND_STAT_RUNNING); - if (taosProcRun(pWrapper->pProc) != 0) { + if (taosProcRun(pWrapper->procObj) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; } @@ -347,7 +347,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { int32_t dndRun(SDnode *pDnode) { if (!tsMultiProcess) { return dndRunInSingleProcess(pDnode); - } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { + } else if (pDnode->ntype == NODE_BEGIN || pDnode->ntype == NODE_END) { return dndRunInParentProcess(pDnode); } else { return dndRunInChildProcess(pDnode); diff --git a/source/dnode/mgmt/main/dndFile.c b/source/dnode/mgmt/main/dndFile.c index 905624f072aa538d2bd005d8ebddb5ed7a2d56b5..45d80543a10f344d4f0e023cf4a99ef187ce6ae0 100644 --- a/source/dnode/mgmt/main/dndFile.c +++ b/source/dnode/mgmt/main/dndFile.c @@ -148,7 +148,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { cJSON *root = NULL; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%s.shmfile", pDnode->data.dataDir, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { dDebug("file %s not exist", file); @@ -164,37 +164,37 @@ int32_t dndReadShmFile(SDnode *pDnode) { goto _OVER; } - for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { + for (EDndNodeType ntype = NODE_BEGIN + 1; ntype < NODE_END; ++ntype) { snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); cJSON *shmid = cJSON_GetObjectItem(root, itemName); if (shmid && shmid->type == cJSON_Number) { - pDnode->wrappers[ntype].shm.id = shmid->valueint; + pDnode->wrappers[ntype].procShm.id = shmid->valueint; } snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype)); cJSON *shmsize = cJSON_GetObjectItem(root, itemName); if (shmsize && shmsize->type == cJSON_Number) { - pDnode->wrappers[ntype].shm.size = shmsize->valueint; + pDnode->wrappers[ntype].procShm.size = shmsize->valueint; } } } - if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { - for (EDndType ntype = DNODE; ntype < NODE_MAX; ++ntype) { + if (!tsMultiProcess || pDnode->ntype == NODE_BEGIN || pDnode->ntype == NODE_END) { + for (EDndNodeType ntype = NODE_BEGIN; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - if (pWrapper->shm.id >= 0) { - dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); - taosDropShm(&pWrapper->shm); + if (pWrapper->procShm.id >= 0) { + dDebug("shmid:%d, is closed, size:%d", pWrapper->procShm.id, pWrapper->procShm.size); + taosDropShm(&pWrapper->procShm); } } } else { SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; - if (taosAttachShm(&pWrapper->shm) != 0) { + if (taosAttachShm(&pWrapper->procShm) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("shmid:%d, failed to attach shm since %s", pWrapper->shm.id, terrstr()); + dError("shmid:%d, failed to attach shm since %s", pWrapper->procShm.id, terrstr()); goto _OVER; } - dInfo("node:%s, shmid:%d is attached, size:%d", pWrapper->name, pWrapper->shm.id, pWrapper->shm.size); + dInfo("node:%s, shmid:%d is attached, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size); } dDebug("successed to load %s", file); @@ -215,8 +215,8 @@ int32_t dndWriteShmFile(SDnode *pDnode) { char realfile[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->dataDir, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->data.dataDir, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->data.dataDir, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -226,13 +226,13 @@ int32_t dndWriteShmFile(SDnode *pDnode) { } len += snprintf(content + len, MAXLEN - len, "{\n"); - for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { + for (EDndNodeType ntype = NODE_BEGIN + 1; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); - if (ntype == NODE_MAX - 1) { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dndNodeProcStr(ntype), pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->procShm.id); + if (ntype == NODE_END - 1) { + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dndNodeProcStr(ntype), pWrapper->procShm.size); } else { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndNodeProcStr(ntype), pWrapper->procShm.size); } } len += snprintf(content + len, MAXLEN - len, "}\n"); diff --git a/source/dnode/mgmt/main/dndInt.c b/source/dnode/mgmt/main/dndObj.c similarity index 72% rename from source/dnode/mgmt/main/dndInt.c rename to source/dnode/mgmt/main/dndObj.c index d406b0c02e99d438d20539f0125ac589a23c77ed..ff2be4272266017d0ce7e7a870ffe2ea98fc85c7 100644 --- a/source/dnode/mgmt/main/dndInt.c +++ b/source/dnode/mgmt/main/dndObj.c @@ -17,27 +17,27 @@ #include "dndInt.h" static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { - pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; - pDnode->serverPort = pOption->serverPort; - pDnode->dataDir = strdup(pOption->dataDir); - pDnode->localEp = strdup(pOption->localEp); - pDnode->localFqdn = strdup(pOption->localFqdn); - pDnode->firstEp = strdup(pOption->firstEp); - pDnode->secondEp = strdup(pOption->secondEp); - pDnode->disks = pOption->disks; - pDnode->numOfDisks = pOption->numOfDisks; + pDnode->data.supportVnodes = pOption->numOfSupportVnodes; + pDnode->data.serverPort = pOption->serverPort; + pDnode->data.dataDir = strdup(pOption->dataDir); + pDnode->data.localEp = strdup(pOption->localEp); + pDnode->data.localFqdn = strdup(pOption->localFqdn); + pDnode->data.firstEp = strdup(pOption->firstEp); + pDnode->data.secondEp = strdup(pOption->secondEp); + pDnode->data.disks = pOption->disks; + pDnode->data.numOfDisks = pOption->numOfDisks; pDnode->ntype = pOption->ntype; - pDnode->rebootTime = taosGetTimestampMs(); + pDnode->data.rebootTime = taosGetTimestampMs(); - if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || - pDnode->secondEp == NULL) { + if (pDnode->data.dataDir == NULL || pDnode->data.localEp == NULL || pDnode->data.localFqdn == NULL || pDnode->data.firstEp == NULL || + pDnode->data.secondEp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { - pDnode->lockfile = dndCheckRunning(pDnode->dataDir); - if (pDnode->lockfile == NULL) { + if (!tsMultiProcess || pDnode->ntype == NODE_BEGIN || pDnode->ntype == NODE_END) { + pDnode->data.lockfile = dndCheckRunning(pDnode->data.dataDir); + if (pDnode->data.lockfile == NULL) { return -1; } } @@ -46,20 +46,20 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { } static void dndClearVars(SDnode *pDnode) { - for (EDndType n = 0; n < NODE_MAX; ++n) { + for (EDndNodeType n = 0; n < NODE_END; ++n) { SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; taosMemoryFreeClear(pMgmt->path); } - if (pDnode->lockfile != NULL) { - taosUnLockFile(pDnode->lockfile); - taosCloseFile(&pDnode->lockfile); - pDnode->lockfile = NULL; + if (pDnode->data.lockfile != NULL) { + taosUnLockFile(pDnode->data.lockfile); + taosCloseFile(&pDnode->data.lockfile); + pDnode->data.lockfile = NULL; } - taosMemoryFreeClear(pDnode->localEp); - taosMemoryFreeClear(pDnode->localFqdn); - taosMemoryFreeClear(pDnode->firstEp); - taosMemoryFreeClear(pDnode->secondEp); - taosMemoryFreeClear(pDnode->dataDir); + taosMemoryFreeClear(pDnode->data.localEp); + taosMemoryFreeClear(pDnode->data.localFqdn); + taosMemoryFreeClear(pDnode->data.firstEp); + taosMemoryFreeClear(pDnode->data.secondEp); + taosMemoryFreeClear(pDnode->data.dataDir); taosMemoryFree(pDnode); dDebug("dnode memory is cleared, data:%p", pDnode); } @@ -82,18 +82,18 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { } dndSetStatus(pDnode, DND_STAT_INIT); - dmSetMgmtFp(&pDnode->wrappers[DNODE]); + dmSetMgmtFp(&pDnode->wrappers[NODE_BEGIN]); mmSetMgmtFp(&pDnode->wrappers[MNODE]); - vmSetMgmtFp(&pDnode->wrappers[VNODES]); + vmSetMgmtFp(&pDnode->wrappers[VNODE]); qmSetMgmtFp(&pDnode->wrappers[QNODE]); smSetMgmtFp(&pDnode->wrappers[SNODE]); bmSetMgmtFp(&pDnode->wrappers[BNODE]); - for (EDndType n = 0; n < NODE_MAX; ++n) { + for (EDndNodeType n = 0; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); + snprintf(path, sizeof(path), "%s%s%s", pDnode->data.dataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); - pWrapper->shm.id = -1; + pWrapper->procShm.id = -1; pWrapper->pDnode = pDnode; pWrapper->ntype = n; if (pWrapper->path == NULL) { @@ -101,7 +101,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { goto _OVER; } - pWrapper->procType = PROC_SINGLE; + pWrapper->procType = DND_PROC_SINGLE; taosInitRWLatch(&pWrapper->latch); } @@ -134,7 +134,7 @@ _OVER: void dndClose(SDnode *pDnode) { if (pDnode == NULL) return; - for (EDndType n = 0; n < NODE_MAX; ++n) { + for (EDndNodeType n = 0; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; dndCloseNode(pWrapper); } @@ -149,7 +149,7 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) { } } -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType ntype) { +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pRetWrapper = pWrapper; @@ -170,7 +170,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { int32_t code = 0; taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) { + if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); } else { @@ -196,15 +196,6 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; } -EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } - -void dndSetStatus(SDnode *pDnode, EDndStatus status) { - if (pDnode->status != status) { - dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status)); - pDnode->status = status; - } -} - void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { SStartupReq *pStartup = &pDnode->startup; tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); diff --git a/source/dnode/mgmt/main/dndTransport.c b/source/dnode/mgmt/main/dndTransport.c index bcebd521b604027a4d0f4d9a36453f97ea79c5df..c5f195d71e0e2cebe381812965fa377ed72d7e5b 100644 --- a/source/dnode/mgmt/main/dndTransport.c +++ b/source/dnode/mgmt/main/dndTransport.c @@ -21,7 +21,7 @@ #define INTERNAL_SECRET "_pwd" static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE]; + SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN]; dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet); } @@ -53,7 +53,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS int32_t code = -1; SNodeMsg *pMsg = NULL; NodeMsgFp msgFp = NULL; - uint16_t msgType = pRpc->msgType; + uint16_t msgType = pRpc->msgType; if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) { dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); @@ -64,12 +64,12 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER; if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER; - if (pWrapper->procType == PROC_SINGLE) { + if (pWrapper->procType == DND_PROC_SINGLE) { dTrace("msg:%p, is created, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); code = (*msgFp)(pWrapper, pMsg); - } else if (pWrapper->procType == PROC_PARENT) { + } else if (pWrapper->procType == DND_PROC_PARENT) { dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); - code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, + code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, PROC_REQ); } else { dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); @@ -78,7 +78,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS _OVER: if (code == 0) { - if (pWrapper->procType == PROC_PARENT) { + if (pWrapper->procType == DND_PROC_PARENT) { dTrace("msg:%p, is freed in parent process", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); @@ -105,11 +105,11 @@ _OVER: } static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - STransMgmt *pMgmt = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; tmsg_t msgType = pMsg->msgType; bool isReq = msgType & 1u; - SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - SMgmtWrapper *pWrapper = pHandle->pWrapper; + SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; + SMgmtWrapper *pWrapper = pHandle->pNdWrapper; if (msgType == TDMT_DND_NETWORK_TEST) { dTrace("network test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); @@ -159,7 +159,7 @@ static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { } static int32_t dndInitClient(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -178,8 +178,8 @@ static int32_t dndInitClient(SDnode *pDnode) { taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); rpcInit.secret = pass; - pMgmt->clientRpc = rpcOpen(&rpcInit); - if (pMgmt->clientRpc == NULL) { + pTrans->clientRpc = rpcOpen(&rpcInit); + if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client"); return -1; } @@ -189,17 +189,17 @@ static int32_t dndInitClient(SDnode *pDnode) { } static void dndCleanupClient(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->trans; - if (pMgmt->clientRpc) { - rpcClose(pMgmt->clientRpc); - pMgmt->clientRpc = NULL; + SDnodeTrans *pTrans = &pDnode->trans; + if (pTrans->clientRpc) { + rpcClose(pTrans->clientRpc); + pTrans->clientRpc = NULL; dDebug("dnode rpc client is closed"); } } static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { SEpSet epSet = {0}; - SMgmtWrapper *pWrapper = &pDnode->wrappers[DNODE]; + SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN]; dmGetMnodeEpSet(pWrapper->pMgmt, &epSet); rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp); } @@ -263,11 +263,11 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch } static int32_t dndInitServer(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = pDnode->serverPort; + rpcInit.localPort = pDnode->data.serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dndProcessMsg; @@ -277,8 +277,8 @@ static int32_t dndInitServer(SDnode *pDnode) { rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo; rpcInit.parent = pDnode; - pMgmt->serverRpc = rpcOpen(&rpcInit); - if (pMgmt->serverRpc == NULL) { + pTrans->serverRpc = rpcOpen(&rpcInit); + if (pTrans->serverRpc == NULL) { dError("failed to init dnode rpc server"); return -1; } @@ -288,10 +288,10 @@ static int32_t dndInitServer(SDnode *pDnode) { } static void dndCleanupServer(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->trans; - if (pMgmt->serverRpc) { - rpcClose(pMgmt->serverRpc); - pMgmt->serverRpc = NULL; + SDnodeTrans *pTrans = &pDnode->trans; + if (pTrans->serverRpc) { + rpcClose(pTrans->serverRpc); + pTrans->serverRpc = NULL; dDebug("dnode rpc server is closed"); } } @@ -308,9 +308,9 @@ void dndCleanupTrans(SDnode *pDnode) { } int32_t dndInitMsgHandle(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; - for (EDndType n = 0; n < NODE_MAX; ++n) { + for (EDndNodeType n = 0; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { @@ -318,7 +318,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { int8_t vgId = pWrapper->msgVgIds[msgIndex]; if (msgFp == NULL) continue; - SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; + SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex]; if (vgId == QNODE_HANDLE) { if (pHandle->pQndWrapper != NULL) { dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); @@ -332,11 +332,11 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { } pHandle->pMndWrapper = pWrapper; } else { - if (pHandle->pWrapper != NULL) { + if (pHandle->pNdWrapper != NULL) { dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); return -1; } - pHandle->pWrapper = pWrapper; + pHandle->pNdWrapper = pWrapper; } } } @@ -344,13 +344,13 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { return 0; } -static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pMgmt->clientRpc == NULL) { +static int32_t dndSendRpcReq(SDnodeTrans *pTrans, const SEpSet *pEpSet, SRpcMsg *pReq) { + if (pTrans->clientRpc == NULL) { terrno = TSDB_CODE_NODE_OFFLINE; return -1; } - rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL); + rpcSendRequest(pTrans->clientRpc, pEpSet, pReq, NULL); return 0; } @@ -369,7 +369,7 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg return -1; } - if (pWrapper->procType != PROC_CHILD) { + if (pWrapper->procType != DND_PROC_CHILD) { return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq); } else { char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet)); @@ -380,7 +380,7 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg memcpy(pHead, pReq, sizeof(SRpcMsg)); memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); - taosProcPutToParentQ(pWrapper->pProc, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen, + taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen, PROC_REQ); taosMemoryFree(pHead); return 0; @@ -388,27 +388,27 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg } static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { - if (pWrapper->procType != PROC_CHILD) { + if (pWrapper->procType != DND_PROC_CHILD) { dndSendRpcRsp(pWrapper, pRsp); } else { - taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); + taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); } } static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { - if (pWrapper->procType != PROC_CHILD) { + if (pWrapper->procType != DND_PROC_CHILD) { rpcRegisterBrokenLinkArg(pMsg); } else { - taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST); + taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST); } } static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { - if (pWrapper->procType != PROC_CHILD) { + if (pWrapper->procType != DND_PROC_CHILD) { rpcReleaseHandle(handle, type); } else { SRpcMsg msg = {.handle = handle, .code = type}; - taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE); + taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE); } } @@ -456,7 +456,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t rpcRegisterBrokenLinkArg(pMsg); break; case PROC_RELEASE: - taosProcRemoveHandle(pWrapper->pProc, pMsg->handle); + taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); rpcFreeCont(pCont); break; @@ -464,7 +464,7 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); break; case PROC_RSP: - taosProcRemoveHandle(pWrapper->pProc, pMsg->handle); + taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); dndSendRpcRsp(pWrapper, pMsg); break; default: @@ -484,7 +484,7 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .shm = pWrapper->shm, + .shm = pWrapper->procShm, .parent = pWrapper, .name = pWrapper->name}; return cfg; diff --git a/source/dnode/mgmt/mm/mmHandle.c b/source/dnode/mgmt/mm/mmHandle.c index 63240c32249033ba254617ee4f09a92035b52697..08f1ff1cfafd657a32bbb842720b2dfcb8eb0904 100644 --- a/source/dnode/mgmt/mm/mmHandle.c +++ b/source/dnode/mgmt/mm/mmHandle.c @@ -56,7 +56,7 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (createReq.replica <= 1 || createReq.dnodeId != pDnode->dnodeId) { + if (createReq.replica <= 1 || createReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; @@ -75,7 +75,7 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pDnode->dnodeId) { + if (dropReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop mnode since %s", terrstr()); return -1; @@ -95,9 +95,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - if (alterReq.dnodeId != pDnode->dnodeId) { + if (alterReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to alter mnode since %s, dnodeId:%d input:%d", terrstr(), pDnode->dnodeId, alterReq.dnodeId); + dError("failed to alter mnode since %s, dnodeId:%d input:%d", terrstr(), pDnode->data.dnodeId, alterReq.dnodeId); return -1; } else { return mmAlter(pMgmt, &alterReq); diff --git a/source/dnode/mgmt/mm/mmInt.c b/source/dnode/mgmt/mm/mmInt.c index 49886621ecde417f0a80cd9482428e331f23e2a1..374b33ed0156707f3e5b9924e5be8987a1531bec 100644 --- a/source/dnode/mgmt/mm/mmInt.c +++ b/source/dnode/mgmt/mm/mmInt.c @@ -18,9 +18,9 @@ #include "wal.h" static bool mmDeployRequired(SDnode *pDnode) { - if (pDnode->dnodeId > 0) return false; - if (pDnode->clusterId > 0) return false; - if (strcmp(pDnode->localEp, pDnode->firstEp) != 0) return false; + if (pDnode->data.dnodeId > 0) return false; + if (pDnode->data.clusterId > 0) return false; + if (strcmp(pDnode->data.localEp, pDnode->data.firstEp) != 0) return false; return true; } @@ -53,8 +53,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->selfIndex = 0; SReplica *pReplica = &pOption->replicas[0]; pReplica->id = 1; - pReplica->port = pMgmt->pDnode->serverPort; - tstrncpy(pReplica->fqdn, pMgmt->pDnode->localFqdn, TSDB_FQDN_LEN); + pReplica->port = pMgmt->pDnode->data.serverPort; + tstrncpy(pReplica->fqdn, pMgmt->pDnode->data.localFqdn, TSDB_FQDN_LEN); pOption->deploy = true; pMgmt->selfIndex = pOption->selfIndex; @@ -80,7 +80,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre pReplica->id = pCreate->replicas[i].id; pReplica->port = pCreate->replicas[i].port; memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pMgmt->pDnode->dnodeId) { + if (pReplica->id == pMgmt->pDnode->data.dnodeId) { pOption->selfIndex = i; } } @@ -112,8 +112,8 @@ static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) { if (!deployed) { dInfo("mnode start to deploy"); - if (pMgmt->pWrapper->procType == PROC_CHILD) { - pMgmt->pDnode->dnodeId = 1; + if (pMgmt->pWrapper->procType == DND_PROC_CHILD) { + pMgmt->pDnode->data.dnodeId = 1; } mmBuildOptionForDeploy(pMgmt, &option); } else { @@ -230,8 +230,8 @@ void mmSetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.openFp = mmOpen; mgmtFp.closeFp = mmClose; mgmtFp.startFp = mmStart; - mgmtFp.createMsgFp = mmProcessCreateReq; - mgmtFp.dropMsgFp = mmProcessDropReq; + mgmtFp.createFp = mmProcessCreateReq; + mgmtFp.dropFp = mmProcessDropReq; mgmtFp.requiredFp = mmRequire; mmInitMsgHandle(pWrapper); diff --git a/source/dnode/mgmt/qm/qmHandle.c b/source/dnode/mgmt/qm/qmHandle.c index 4fda72759ae94d881226433acc7c73478b7190a4..af1f903f7e27a5cb9a61ab5458c441dd4daaa5df 100644 --- a/source/dnode/mgmt/qm/qmHandle.c +++ b/source/dnode/mgmt/qm/qmHandle.c @@ -53,7 +53,7 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (createReq.dnodeId != pDnode->dnodeId) { + if (createReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create qnode since %s", terrstr()); return -1; @@ -72,7 +72,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pDnode->dnodeId) { + if (dropReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop qnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/qm/qmInt.c b/source/dnode/mgmt/qm/qmInt.c index 585a7fb18369ba2a4728b9dcb87706da5131292c..cb06fe5d25a6df6f760c3ff41ff3dd88d71e802d 100644 --- a/source/dnode/mgmt/qm/qmInt.c +++ b/source/dnode/mgmt/qm/qmInt.c @@ -116,8 +116,8 @@ void qmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = qmOpen; mgmtFp.closeFp = qmClose; - mgmtFp.createMsgFp = qmProcessCreateReq; - mgmtFp.dropMsgFp = qmProcessDropReq; + mgmtFp.createFp = qmProcessCreateReq; + mgmtFp.dropFp = qmProcessDropReq; mgmtFp.requiredFp = qmRequire; qmInitMsgHandle(pWrapper); diff --git a/source/dnode/mgmt/sm/smHandle.c b/source/dnode/mgmt/sm/smHandle.c index 5b30dc04bc8d979168b9b008d922d40d44cb3471..0f9bb5369d407ed4025dd64a6458d3d8e7fcbd3e 100644 --- a/source/dnode/mgmt/sm/smHandle.c +++ b/source/dnode/mgmt/sm/smHandle.c @@ -53,7 +53,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (createReq.dnodeId != pDnode->dnodeId) { + if (createReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create snode since %s", terrstr()); return -1; @@ -72,7 +72,7 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return -1; } - if (dropReq.dnodeId != pDnode->dnodeId) { + if (dropReq.dnodeId != pDnode->data.dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to drop snode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/sm/smInt.c b/source/dnode/mgmt/sm/smInt.c index ef4e95d915de8afa3d9f660884a3b0637240d7e7..02958104a0eff1dc2be7921ae91b56231d1f47cf 100644 --- a/source/dnode/mgmt/sm/smInt.c +++ b/source/dnode/mgmt/sm/smInt.c @@ -113,8 +113,8 @@ void smSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = smOpen; mgmtFp.closeFp = smClose; - mgmtFp.createMsgFp = smProcessCreateReq; - mgmtFp.dropMsgFp = smProcessDropReq; + mgmtFp.createFp = smProcessCreateReq; + mgmtFp.dropFp = smProcessDropReq; mgmtFp.requiredFp = smRequire; smInitMsgHandle(pWrapper); diff --git a/source/dnode/mgmt/vm/vmInt.c b/source/dnode/mgmt/vm/vmInt.c index 6a1a5c3987ccb50c1a0008e29298e2d67148061b..b298b4c93a37e0720ce4d8da76d6205b5a2cc23b 100644 --- a/source/dnode/mgmt/vm/vmInt.c +++ b/source/dnode/mgmt/vm/vmInt.c @@ -278,11 +278,11 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { taosInitRWLatch(&pMgmt->latch); SDiskCfg dCfg = {0}; - tstrncpy(dCfg.dir, pDnode->dataDir, TSDB_FILENAME_LEN); + tstrncpy(dCfg.dir, pDnode->data.dataDir, TSDB_FILENAME_LEN); dCfg.level = 0; dCfg.primary = 1; - SDiskCfg *pDisks = pDnode->disks; - int32_t numOfDisks = pDnode->numOfDisks; + SDiskCfg *pDisks = pDnode->data.disks; + int32_t numOfDisks = pDnode->data.numOfDisks; if (numOfDisks <= 0 || pDisks == NULL) { pDisks = &dCfg; numOfDisks = 1; @@ -329,7 +329,7 @@ _OVER: static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) { SDnode *pDnode = pWrapper->pDnode; - *required = pDnode->numOfSupportVnodes > 0; + *required = pDnode->data.supportVnodes > 0; return 0; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 86ec49127a5919ca45c291e97d5c91507e3568cd..85f0728076d87929a4faae758d713f80a343c4ce 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -354,7 +354,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) { int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - bool dnodeChanged = (statusReq.dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); + bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; @@ -405,7 +405,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) { pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes; SStatusRsp statusRsp = {0}; - statusRsp.dver = sdbGetTableVer(pMnode->pSdb, SDB_DNODE); + statusRsp.dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE); statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.clusterId = pMnode->clusterId; statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));