提交 692f5f0e 编写于 作者: D dapan1121

feat: show dnode variables

上级 55c524e8
......@@ -816,6 +816,15 @@ int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
typedef struct {
SArray* dnodeList; // SArray<SEpSet>
} SDnodeListRsp;
int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp;
......
......@@ -101,6 +101,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DNODE_LIST, "dnode-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL)
......
......@@ -2056,6 +2056,50 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->qnodeList); }
int32_t tSerializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->dnodeList);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SEpSet *pEpSet = taosArrayGet(pRsp->dnodeList, i);
if (tEncodeSEpSet(&encoder, pEpSet) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (NULL == pRsp->dnodeList) {
pRsp->dnodeList = taosArrayInit(num, sizeof(SEpSet));
if (NULL == pRsp->dnodeList) return -1;
}
for (int32_t i = 0; i < num; ++i) {
SEpSet epSet = {0};
if (tDecodeSEpSet(&decoder, &epSet) < 0) return -1;
taosArrayPush(pRsp->dnodeList, &epSet);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -161,6 +161,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -75,6 +75,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
......@@ -497,6 +498,60 @@ _OVER:
return code;
}
static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SDnodeObj *pObj = NULL;
void *pIter = NULL;
SDnodeListRsp rsp = {0};
int32_t code = -1;
rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == rsp.dnodeList) {
mError("failed to alloc epSet while process dnode list req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
while (1) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
SEpSet epSet = {0};
epSet.numOfEps = 1;
tstrncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->port;
(void)taosArrayPush(rsp.dnodeList, &epSet);
sdbRelease(pSdb, pObj);
}
int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSDnodeListRsp(pRsp, rspLen, &rsp);
pReq->info.rspLen = rspLen;
pReq->info.rsp = pRsp;
code = 0;
_OVER:
if (code != 0) {
mError("failed to get dnode list since %s", terrstr());
}
tFreeSDnodeListRsp(&rsp);
return code;
}
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
......
......@@ -201,6 +201,7 @@ typedef struct SCtgJob {
int32_t dbVgNum;
int32_t udfNum;
int32_t qnodeNum;
int32_t dnodeNum;
int32_t dbCfgNum;
int32_t indexNum;
int32_t userNum;
......
......@@ -1121,8 +1121,19 @@ _return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray** pDnodeList) {
return TSDB_CODE_CTG_INVALID_INPUT;
int32_t catalogGetDnodeList(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** pDnodeList) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == pConn || NULL == pDnodeList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_ERR_JRET(ctgGetDnodeListFromMnode(pCtg, pConn, pDnodeList, NULL));
_return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) {
......
......@@ -353,13 +353,14 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
int32_t tbHashNum = (int32_t)taosArrayGetSize(pReq->pTableHash);
int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum;
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum;
if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_SUCCESS;
......@@ -382,6 +383,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
pJob->tbMetaNum = tbMetaNum;
pJob->tbHashNum = tbHashNum;
pJob->qnodeNum = qnodeNum;
pJob->dnodeNum = dnodeNum;
pJob->dbVgNum = dbVgNum;
pJob->udfNum = udfNum;
pJob->dbCfgNum = dbCfgNum;
......
......@@ -276,9 +276,6 @@ _return:
CTG_RET(code);
}
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
......@@ -316,6 +313,43 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
char *msg = NULL;
......
......@@ -126,6 +126,18 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
*msg = NULL;
*msgLen = 0;
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
......@@ -407,6 +419,28 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
return code;
}
int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
SDnodeListRsp out = {0};
int32_t code = 0;
if (NULL == output || NULL == msg || msgSize <= 0) {
code = TSDB_CODE_TSC_INVALID_INPUT;
return code;
}
out.dnodeList = (SArray *)output;
if (tDeserializeSDnodeListRsp(msg, msgSize, &out) != 0) {
qError("invalid dnode list rsp msg, msgSize:%d", msgSize);
code = TSDB_CODE_INVALID_MSG;
return code;
}
*(SArray**)output = out.dnodeList;
return code;
}
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
SDbCfgRsp out = {0};
......@@ -499,6 +533,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryBuildDnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
......@@ -509,6 +544,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryProcessDnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册