diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d289ff13aedb55db408d3020ba356a5160dd7ead..a4bc1aebae3a879e5a595f83250bcaa314bf5747 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -510,7 +510,8 @@ typedef struct { int8_t superUser; int8_t connType; SEpSet epSet; - char sVersion[128]; + char sVer[TSDB_VERSION_LEN]; + char sDetailVer[128]; } SConnectRsp; int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); @@ -836,6 +837,20 @@ typedef struct { int32_t tSerializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq); int32_t tDeserializeSDnodeListReq(void* buf, int32_t bufLen, SDnodeListReq* pReq); +typedef struct { + int32_t useless; // useless +} SServerVerReq; + +int32_t tSerializeSServerVerReq(void* buf, int32_t bufLen, SServerVerReq* pReq); +int32_t tDeserializeSServerVerReq(void* buf, int32_t bufLen, SServerVerReq* pReq); + +typedef struct { + char ver[TSDB_VERSION_LEN]; +} SServerVerRsp; + +int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp); +int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp); + typedef struct SQueryNodeAddr { int32_t nodeId; // vgId or qnodeId @@ -1229,6 +1244,21 @@ typedef struct { int32_t tSerializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq); int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq); +typedef struct { + char name[TSDB_CONFIG_OPTION_LEN + 1]; + char value[TSDB_CONFIG_VALUE_LEN + 1]; +} SVariablesInfo; + +typedef struct { + SArray *variables; //SArray +} SShowVariablesRsp; + +int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq); +int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq); + +void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp); + + /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1babb450038e692ed4d2241e9ad4a096a38c6ba7..acf08bd47e3cf9ae756fae10aab6ed9a4b23f34c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -163,6 +163,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 20f4765190bb434c363c8cd2d2e35a7c0716bdfe..8482ae3f3de328c21fc6f8b8748fe65d7b99ce98 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -71,6 +71,7 @@ typedef struct SCatalogReq { SArray* pTableCfg; // element is SNAME bool qNodeRequired; // valid qnode bool dNodeRequired; // valid dnode + bool svrVerRequired; bool forceUpdate; } SCatalogReq; @@ -80,18 +81,19 @@ typedef struct SMetaRes { } SMetaRes; typedef struct SMetaData { - SArray* pDbVgroup; // pRes = SArray* - SArray* pDbCfg; // pRes = SDbCfgInfo* - SArray* pDbInfo; // pRes = SDbInfo* - SArray* pTableMeta; // pRes = STableMeta* - SArray* pTableHash; // pRes = SVgroupInfo* - SArray* pTableIndex; // pRes = SArray* - SArray* pUdfList; // pRes = SFuncInfo* - SArray* pIndex; // pRes = SIndexInfo* - SArray* pUser; // pRes = bool* - SArray* pQnodeList; // pRes = SArray* - SArray* pTableCfg; // pRes = STableCfg* - SArray* pDnodeList; // pRes = SArray* + SArray* pDbVgroup; // pRes = SArray* + SArray* pDbCfg; // pRes = SDbCfgInfo* + SArray* pDbInfo; // pRes = SDbInfo* + SArray* pTableMeta; // pRes = STableMeta* + SArray* pTableHash; // pRes = SVgroupInfo* + SArray* pTableIndex; // pRes = SArray* + SArray* pUdfList; // pRes = SFuncInfo* + SArray* pIndex; // pRes = SIndexInfo* + SArray* pUser; // pRes = bool* + SArray* pQnodeList; // pRes = SArray* + SArray* pTableCfg; // pRes = STableCfg* + SArray* pDnodeList; // pRes = SArray* + SMetaRes* pSvrVer; // pRes = char* } SMetaData; typedef struct SCatalogCfg { @@ -268,7 +270,7 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, c */ int32_t catalogGetAllMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId); +int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId); int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList); @@ -298,6 +300,8 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); +int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo *pConn, char** pVersion); + int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, bool forceUpdate); int32_t catalogClearCache(void); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 4f18cb19c2802f2811354ca15af227878c29ab00..585675c5fde16fc7961d10384b939ae5c1b9c403 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -40,7 +40,6 @@ extern "C" { #define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE) #define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) - #define PRIVILEGE_TYPE_MASK(n) (1 << n) #define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index e45e5bd1608861a82520569df36587bacb8dfc4c..6c2a9bb374c019040b246dac6bb3e8f44da93a57 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -51,6 +51,8 @@ typedef struct SParseContext { bool isSuperUser; bool async; int8_t schemalessType; + const char* svrVer; + bool nodeOffline; } SParseContext; int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 2b918412feb8f8a8fe5cb118c8111c8e7699d1b5..cca79186d06d137a1f0e5ccaa6a0e0fc03d753de 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -54,6 +54,11 @@ enum { RES_TYPE__TMQ_META, }; +#define SHOW_VARIABLES_RESULT_COLS 2 +#define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE) +#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) + + #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) @@ -104,6 +109,8 @@ typedef struct SHeartBeatInfo { struct SAppInstInfo { int64_t numOfConns; SCorEpSet mgmtEp; + int32_t totalDnodes; + int32_t onlineDnodes; TdThreadMutex qnodeMutex; SArray* pQnodeList; SAppClusterSummary summary; @@ -127,7 +134,8 @@ typedef struct STscObj { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_FNAME_LEN]; - char ver[128]; + char sVer[TSDB_VERSION_LEN]; + char sDetailVer[128]; int8_t connType; int32_t acctId; uint32_t connId; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index c6d0d6a860e747a7240b4bab4f9153a07f5ae8c6..86562fea9787eae90de5c72ee247d377761fd626 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -161,6 +161,9 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); } + + pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes; + pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes; pTscObj->connId = pRsp->query->connId; if (pRsp->query->killRid) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 51e709ac58916c3efb5d7c1404687d2ccf3b25b8..b3aaeaea787f664e1d9805c630145021e04ee872 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -178,7 +178,9 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC .pStmtCb = pStmtCb, .pUser = pTscObj->user, .schemalessType = pTscObj->schemalessType, - .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))}; + .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), + .svrVer = pTscObj->sVer, + .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 515136bb27a486d92d3e42c9db2d69318678b75d..f5dfe2de3668e51cd2d95e5b5ddf78d0f5446845 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -623,7 +623,7 @@ const char *taos_get_server_info(TAOS *taos) { releaseTscObj(*(int64_t *)taos); - return pTscObj->ver; + return pTscObj->sDetailVer; } typedef struct SqlParseWrapper { @@ -766,7 +766,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { .requestObjRefId = pCxt->requestRid, .mgmtEps = pCxt->mgmtEpSet}; - code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId, &catalogReq, retrieveMetaCallback, pWrapper, + code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); if (code == TSDB_CODE_SUCCESS) { return; @@ -934,7 +934,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - code = catalogAsyncGetAllMeta(pCtg, &conn, pRequest->requestId, &catalogReq, syncCatalogFn, ¶m, NULL); + code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, ¶m, NULL); if (code) { goto _return; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index db8aebb32293de4e20c95b96062283d4187e867e..45a525d124feace87a4917beec54b5035d539748 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -82,7 +82,8 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connId = connectRsp.connId; pTscObj->acctId = connectRsp.acctId; - tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver)); + tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer)); + tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer)); // update the appInstInfo pTscObj->pAppInfo->clusterId = connectRsp.clusterId; @@ -287,6 +288,103 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } +static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) { + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + pBlock->info.hasVarCol = true; + + pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData)); + + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN; + + taosArrayPush(pBlock->pDataBlock, &infoData); + + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN; + taosArrayPush(pBlock->pDataBlock, &infoData); + + int32_t numOfCfg = taosArrayGetSize(pVars); + blockDataEnsureCapacity(pBlock, numOfCfg); + + for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { + SVariablesInfo *pInfo = taosArrayGet(pVars, i); + + char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++); + colDataAppend(pColInfo, i, name, false); + + char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE); + pColInfo = taosArrayGet(pBlock->pDataBlock, c++); + colDataAppend(pColInfo, i, value, false); + } + + pBlock->info.rows = numOfCfg; + + *block = pBlock; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { + SSDataBlock* pBlock = NULL; + int32_t code = buildShowVariablesBlock(pVars, &pBlock); + if (code) { + return code; + } + + size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + *pRsp = taosMemoryCalloc(1, rspSize); + if (NULL == *pRsp) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pRsp)->useconds = 0; + (*pRsp)->completed = 1; + (*pRsp)->precision = 0; + (*pRsp)->compressed = 0; + (*pRsp)->compLen = 0; + (*pRsp)->numOfRows = htonl(pBlock->info.rows); + (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); + + int32_t len = 0; + blockCompressEncode(pBlock, (*pRsp)->data, &len, SHOW_VARIABLES_RESULT_COLS, false); + ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); + + blockDataDestroy(pBlock); + return TSDB_CODE_SUCCESS; +} + +int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + } else { + SShowVariablesRsp rsp = {0}; + SRetrieveTableRsp* pRes = NULL; + code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp); + if (TSDB_CODE_SUCCESS == code) { + code = buildShowVariablesRsp(rsp.variables, &pRes); + } + if (TSDB_CODE_SUCCESS == code) { + code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false); + } + + tFreeSShowVariablesRsp(&rsp); + } + + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } else { + tsem_post(&pRequest->body.rspSem); + } + return code; +} + + __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { switch (msgType) { case TDMT_MND_CONNECT: @@ -301,6 +399,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { return processDropDbRsp; case TDMT_MND_ALTER_STB: return processAlterStbRsp; + case TDMT_MND_SHOW_VARIABLES: + return processShowVariablesRsp; default: return genericRspCallback; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4ad11f2998df6a6ee5436e0895c90bfcdab458b2..2c6b94f17c0c2368c41209757a88d28ea0dfb578 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2246,6 +2246,56 @@ int32_t tDeserializeSDnodeListReq(void *buf, int32_t bufLen, SDnodeListReq *pReq return 0; } +int32_t tSerializeSServerVerReq(void *buf, int32_t bufLen, SServerVerReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->useless) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSServerVerReq(void *buf, int32_t bufLen, SServerVerReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->ver) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->ver) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { SEncoder encoder = {0}; @@ -2855,6 +2905,67 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR return 0; } +int32_t tEncodeSVariablesInfo(SEncoder* pEncoder, SVariablesInfo* pInfo) { + if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1; + if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1; + return 0; +} + +int32_t tDecodeSVariablesInfo(SDecoder* pDecoder, SVariablesInfo* pInfo) { + if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1; + return 0; +} + + +int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + int32_t varNum = taosArrayGetSize(pRsp->variables); + if (tEncodeI32(&encoder, varNum) < 0) return -1; + for (int32_t i = 0; i < varNum; ++i) { + SVariablesInfo* pInfo = taosArrayGet(pRsp->variables, i); + if (tEncodeSVariablesInfo(&encoder, pInfo) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + int32_t varNum = 0; + if (tDecodeI32(&decoder, &varNum) < 0) return -1; + if (varNum > 0) { + pRsp->variables = taosArrayInit(varNum, sizeof(SVariablesInfo)); + if (NULL == pRsp->variables) return -1; + for (int32_t i = 0; i < varNum; ++i) { + SVariablesInfo info = {0}; + if (tDecodeSVariablesInfo(&decoder, &info) < 0) return -1; + if (NULL == taosArrayPush(pRsp->variables, &info)) return -1; + } + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp) { + if (NULL == pRsp) { + return; + } + + taosArrayDestroy(pRsp->variables); +} + int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -3396,7 +3507,8 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3416,7 +3528,8 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; - if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d44a7d79bfc98d765065fc6bfa19facac8e1d9ae..cb741994b8201e9110f86897d6b3ac774d85b5f0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -206,6 +206,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d3723326b99717ffa17f874a71cae76cee02f4aa..0eab364e90ba1c4b9841466db421f3f74b14582e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -48,6 +48,7 @@ static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew); static int32_t mndProcessDnodeListReq(SRpcMsg *pReq); +static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq); static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq); @@ -78,6 +79,7 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq); mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq); + mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig); @@ -554,6 +556,60 @@ _OVER: return code; } +static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) { + SShowVariablesRsp rsp = {0}; + int32_t code = -1; + + rsp.variables = taosArrayInit(4, sizeof(SVariablesInfo)); + if (NULL == rsp.variables) { + mError("failed to alloc SVariablesInfo array while process show variables req"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + SVariablesInfo info = {0}; + + strcpy(info.name, "statusInterval"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval); + taosArrayPush(rsp.variables, &info); + + strcpy(info.name, "timezone"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr); + taosArrayPush(rsp.variables, &info); + + strcpy(info.name, "locale"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale); + taosArrayPush(rsp.variables, &info); + + strcpy(info.name, "charset"); + snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset); + taosArrayPush(rsp.variables, &info); + + int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp); + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + tSerializeSShowVariablesRsp(pRsp, rspLen, &rsp); + + pReq->info.rspLen = rspLen; + pReq->info.rsp = pRsp; + code = 0; + +_OVER: + + if (code != 0) { + mError("failed to get show variables info since %s", terrstr()); + } + + tFreeSShowVariablesRsp(&rsp); + + return code; +} + + static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 832e328d962479f739004ae765171ddfeb71aa2d..acbbf993fdeac9f01efd0d8ebd0253b9b2a9e258 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -70,6 +70,7 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); static void mndFreeApp(SAppObj *pApp); static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextApp(SMnode *pMnode, void *pIter); +static int32_t mndProcessSvrVerReq(SRpcMsg *pReq); int32_t mndInitProfile(SMnode *pMnode) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -94,6 +95,7 @@ int32_t mndInitProfile(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq); + mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); @@ -262,8 +264,9 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { connectRsp.connId = pConn->id; connectRsp.connType = connReq.connType; connectRsp.dnodeNum = mndGetDnodeSize(pMnode); - - snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, + + strcpy(connectRsp.sVer, version); + snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); mndGetMnodeEpSet(pMnode, &connectRsp.epSet); @@ -460,6 +463,27 @@ static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnIn return TSDB_CODE_SUCCESS; } +static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) { + SSdb *pSdb = pMnode->pSdb; + SDnodeObj *pDnode = NULL; + int64_t curMs = taosGetTimestampMs(); + void *pIter = NULL; + + while (true) { + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + + bool online = mndIsDnodeOnline(pDnode, curMs); + if (online) { + (*num)++; + } + + sdbRelease(pSdb, pDnode); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -503,7 +527,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb rspBasic->connId = pConn->id; rspBasic->totalDnodes = mndGetDnodeSize(pMnode); - rspBasic->onlineDnodes = 1; // TODO + mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes); mndGetMnodeEpSet(pMnode, &rspBasic->epSet); mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1); @@ -694,6 +718,28 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { } } +static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) { + int32_t code = -1; + SServerVerRsp rsp = {0}; + strcpy(rsp.ver, version); + + int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp); + if (contLen < 0) goto _over; + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) goto _over; + tSerializeSServerVerRsp(pRsp, contLen, &rsp); + + pReq->info.rspLen = contLen; + pReq->info.rsp = pRsp; + + code = 0; + +_over: + + return code; +} + + static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 384c3f19e5ed76333d8bcebcac403e7c7c4c1358..74caa717b8e62b38ec87432cd9fefb86741a8392 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -76,6 +76,7 @@ typedef enum { CTG_TASK_GET_INDEX, CTG_TASK_GET_UDF, CTG_TASK_GET_USER, + CTG_TASK_GET_SVR_VER, } CTG_TASK_TYPE; typedef enum { @@ -224,6 +225,7 @@ typedef struct SCtgJob { int32_t dbInfoNum; int32_t tbIndexNum; int32_t tbCfgNum; + int32_t svrVerNum; } SCtgJob; typedef struct SCtgMsgCtx { @@ -578,8 +580,9 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask); int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask); +int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask); -int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum); +int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum); int32_t ctgLaunchJob(SCtgJob *pJob); int32_t ctgMakeAsyncRes(SCtgJob *pJob); int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 931a944adfbe4db5370ffb6591c4d33403dc0afb..abead99cb01607a503595c9092c0701ace0660d1 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1051,7 +1051,7 @@ _return: CTG_API_LEAVE(code); } -int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) { +int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pConn || NULL == pReq || NULL == fp || NULL == param) { @@ -1060,7 +1060,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, uint64_t int32_t code = 0, taskNum = 0; SCtgJob *pJob = NULL; - CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, reqId, pReq, fp, param, &taskNum)); + CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param, &taskNum)); if (taskNum <= 0) { SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData)); fp(pMetaData, param, TSDB_CODE_SUCCESS); @@ -1247,6 +1247,22 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo *pConn, char** pVersion) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pConn || NULL == pVersion) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgGetSvrVerFromMnode(pCtg, pConn, pVersion, NULL)); + +_return: + + CTG_API_LEAVE(code); +} + + int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) { CTG_API_ENTER(); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 8928a7e028bd4a6cdc9f07ab0de29447dc5b09d8..18c95397dd8b725e262adc1ac8a89fff3a47777a 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -255,6 +255,20 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { return TSDB_CODE_SUCCESS; } +int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { + SCtgTask task = {0}; + + task.type = CTG_TASK_GET_SVR_VER; + task.taskId = taskIdx; + task.pJob = pJob; + + taosArrayPush(pJob->pTasks, &task); + + qDebug("QID:0x%" PRIx64 " [%dth] task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SName *name = (SName*)param; SCtgTask task = {0}; @@ -413,7 +427,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas return TSDB_CODE_SUCCESS; } -int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) { +int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) { int32_t code = 0; int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta); int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup); @@ -421,6 +435,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf); int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0; int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0; + int32_t svrVerNum = pReq->svrVerRequired ? 1 : 0; int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg); int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex); int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser); @@ -428,21 +443,21 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); - *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; + *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; if (*taskNum <= 0) { - ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId); + ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, pConn->requestId); return TSDB_CODE_SUCCESS; } *job = taosMemoryCalloc(1, sizeof(SCtgJob)); if (NULL == *job) { - ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), reqId); + ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } SCtgJob *pJob = *job; - pJob->queryId = reqId; + pJob->queryId = pConn->requestId; pJob->userFp = fp; pJob->pCtg = pCtg; pJob->conn = *pConn; @@ -460,6 +475,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 pJob->dbInfoNum = dbInfoNum; pJob->tbIndexNum = tbIndexNum; pJob->tbCfgNum = tbCfgNum; + pJob->svrVerNum = svrVerNum; pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask)); @@ -530,6 +546,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6 CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DNODE, NULL, NULL)); } + if (svrVerNum) { + CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_SVR_VER, NULL, NULL)); + } + pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob); if (pJob->refId < 0) { ctgError("add job to ref failed, error: %s", tstrerror(terrno)); @@ -728,6 +748,21 @@ int32_t ctgDumpUserRes(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgDumpSvrVer(SCtgTask* pTask) { + SCtgJob* pJob = pTask->pJob; + if (NULL == pJob->jobRes.pSvrVer) { + pJob->jobRes.pSvrVer = taosMemoryCalloc(1, sizeof(SMetaRes)); + if (NULL == pJob->jobRes.pSvrVer) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + pJob->jobRes.pSvrVer->code = pTask->code; + pJob->jobRes.pSvrVer->pRes = pTask->res; + + return TSDB_CODE_SUCCESS; +} + int32_t ctgInvokeSubCb(SCtgTask *pTask) { int32_t code = 0; @@ -1156,6 +1191,20 @@ _return: CTG_RET(code); } +int32_t ctgHandleGetSvrVerRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + + CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); + + TSWAP(pTask->res, pTask->msgCtx.out); + +_return: + + ctgHandleTaskEnd(pTask, code); + + CTG_RET(code); +} + int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; @@ -1459,6 +1508,15 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + + CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask)); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { ctgResetTbMetaTask(pTask); @@ -1532,6 +1590,7 @@ SCtgAsyncFps gCtgAsyncFps[] = { {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL}, {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, + {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, }; int32_t ctgMakeAsyncRes(SCtgJob *pJob) { @@ -1633,7 +1692,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { for (int32_t i = 0; i < taskNum; ++i) { SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); - qDebug("QID:0x%" PRIx64 " ctg start to launch task %d", pJob->queryId, pTask->taskId); + qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); pTask->status = CTG_TASK_LAUNCHED; } diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index 7f2b919f175adfa79a5bc3916813c8b6619e1b3f..5f54362d8e95d4637a734e308d03900060cff03c 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -210,7 +210,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re int64_t jobId = 0; - CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, reqId, &req, ctgdUserCallback, param, &jobId)); + CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, &req, ctgdUserCallback, param, &jobId)); _return: diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 97edb1b837d3a226fe5baabf52efc0fbf5419fc0..304da88888cc04ded5e7b491710d93bb652ea60f 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -217,6 +217,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("Got stb cfg from mnode, tbFName:%s", target); break; } + case TDMT_MND_SERVER_VERSION: { + if (TSDB_CODE_SUCCESS != rspCode) { + qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode)); + CTG_ERR_RET(rspCode); + } + + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); + if (code) { + qError("Process svr ver rsp failed, error:%s", tstrerror(code)); + CTG_ERR_RET(code); + } + + qDebug("Got svr ver from mnode"); + break; + } default: qError("invalid req type %s", TMSG_INFO(reqType)); return TSDB_CODE_APP_ERROR; @@ -811,4 +826,38 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S return TSDB_CODE_SUCCESS; } +int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) { + char *msg = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_MND_SERVER_VERSION; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + + qDebug("try to get svr ver from mnode"); + + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp); + if (code) { + ctgError("Build get svr ver msg failed, code:%s", tstrerror(code)); + CTG_ERR_RET(code); + } + + if (pTask) { + CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL)); + + CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); + } + + SRpcMsg rpcMsg = { + .msgType = reqType, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); + + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL)); + + return TSDB_CODE_SUCCESS; +} + diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index cc823adad00e47505e8dd49a0a98fbfab2919c86..0c10b69a2dca97472940cda05fcc25fa8f2af165 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -45,6 +45,8 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) { return "[get udf]"; case CTG_TASK_GET_USER: return "[get user]"; + case CTG_TASK_GET_SVR_VER: + return "[get svr ver]"; default: return "unknown"; } @@ -103,8 +105,13 @@ void ctgFreeSMetaData(SMetaData* pData) { taosArrayDestroy(pData->pQnodeList); pData->pQnodeList = NULL; + taosArrayDestroy(pData->pDnodeList); + pData->pDnodeList = NULL; + taosArrayDestroy(pData->pTableCfg); pData->pTableCfg = NULL; + + taosMemoryFreeClear(pData->pSvrVer); } void ctgFreeSCtgUserAuth(SCtgUserAuth *userCache) { @@ -346,20 +353,8 @@ void ctgResetTbMetaTask(SCtgTask* pTask) { void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { switch (type) { - case CTG_TASK_GET_QNODE: { - taosArrayDestroy((SArray*)*pRes); - *pRes = NULL; - break; - } - case CTG_TASK_GET_DNODE: { - taosArrayDestroy((SArray*)*pRes); - *pRes = NULL; - break; - } - case CTG_TASK_GET_TB_META: { - taosMemoryFreeClear(*pRes); - break; - } + case CTG_TASK_GET_QNODE: + case CTG_TASK_GET_DNODE: case CTG_TASK_GET_DB_VGROUP: { taosArrayDestroy((SArray*)*pRes); *pRes = NULL; @@ -373,14 +368,6 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { } break; } - case CTG_TASK_GET_DB_INFO: { - taosMemoryFreeClear(*pRes); - break; - } - case CTG_TASK_GET_TB_HASH: { - taosMemoryFreeClear(*pRes); - break; - } case CTG_TASK_GET_TB_INDEX: { taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo); *pRes = NULL; @@ -394,15 +381,13 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { } break; } - case CTG_TASK_GET_INDEX: { - taosMemoryFreeClear(*pRes); - break; - } - case CTG_TASK_GET_UDF: { - taosMemoryFreeClear(*pRes); - break; - } - case CTG_TASK_GET_USER: { + case CTG_TASK_GET_TB_HASH: + case CTG_TASK_GET_DB_INFO: + case CTG_TASK_GET_INDEX: + case CTG_TASK_GET_UDF: + case CTG_TASK_GET_USER: + case CTG_TASK_GET_SVR_VER: + case CTG_TASK_GET_TB_META: { taosMemoryFreeClear(*pRes); break; } @@ -415,20 +400,12 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void **pRes) { void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { switch (type) { - case CTG_TASK_GET_QNODE: { - taosArrayDestroy((SArray*)*pRes); - *pRes = NULL; - break; - } + case CTG_TASK_GET_QNODE: case CTG_TASK_GET_DNODE: { taosArrayDestroy((SArray*)*pRes); *pRes = NULL; break; } - case CTG_TASK_GET_TB_META: { - taosMemoryFreeClear(*pRes); - break; - } case CTG_TASK_GET_DB_VGROUP: { if (*pRes) { SDBVgInfo* pInfo = (SDBVgInfo*)*pRes; @@ -445,14 +422,6 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { } break; } - case CTG_TASK_GET_DB_INFO: { - taosMemoryFreeClear(*pRes); - break; - } - case CTG_TASK_GET_TB_HASH: { - taosMemoryFreeClear(*pRes); - break; - } case CTG_TASK_GET_TB_INDEX: { taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo); *pRes = NULL; @@ -466,14 +435,12 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void **pRes) { } break; } - case CTG_TASK_GET_INDEX: { - taosMemoryFreeClear(*pRes); - break; - } - case CTG_TASK_GET_UDF: { - taosMemoryFreeClear(*pRes); - break; - } + case CTG_TASK_GET_TB_META: + case CTG_TASK_GET_DB_INFO: + case CTG_TASK_GET_TB_HASH: + case CTG_TASK_GET_INDEX: + case CTG_TASK_GET_UDF: + case CTG_TASK_GET_SVR_VER: case CTG_TASK_GET_USER: { taosMemoryFreeClear(*pRes); break; @@ -497,10 +464,6 @@ void ctgClearSubTaskRes(SCtgSubRes *pRes) { void ctgFreeTaskCtx(SCtgTask* pTask) { switch (pTask->type) { - case CTG_TASK_GET_QNODE: { - taosMemoryFreeClear(pTask->taskCtx); - break; - } case CTG_TASK_GET_TB_META: { SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx; taosMemoryFreeClear(taskCtx->pName); @@ -511,18 +474,6 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { taosMemoryFreeClear(pTask->taskCtx); break; } - case CTG_TASK_GET_DB_VGROUP: { - taosMemoryFreeClear(pTask->taskCtx); - break; - } - case CTG_TASK_GET_DB_CFG: { - taosMemoryFreeClear(pTask->taskCtx); - break; - } - case CTG_TASK_GET_DB_INFO: { - taosMemoryFreeClear(pTask->taskCtx); - break; - } case CTG_TASK_GET_TB_HASH: { SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx; taosMemoryFreeClear(taskCtx->pName); @@ -542,14 +493,12 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { taosMemoryFreeClear(pTask->taskCtx); break; } - case CTG_TASK_GET_INDEX: { - taosMemoryFreeClear(pTask->taskCtx); - break; - } - case CTG_TASK_GET_UDF: { - taosMemoryFreeClear(pTask->taskCtx); - break; - } + case CTG_TASK_GET_DB_VGROUP: + case CTG_TASK_GET_DB_CFG: + case CTG_TASK_GET_DB_INFO: + case CTG_TASK_GET_INDEX: + case CTG_TASK_GET_UDF: + case CTG_TASK_GET_QNODE: case CTG_TASK_GET_USER: { taosMemoryFreeClear(pTask->taskCtx); break; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7b5c30d3cb25467d007644eceb3cb5ba740a6403..58b50b776f6560500d6a1c8a7301d175c4f250e7 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4611,7 +4611,7 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema** return TSDB_CODE_SUCCESS; } -static int32_t extractShowLocalVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) { +static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) { *numOfCols = 2; *pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema)); if (NULL == (*pSchema)) { @@ -4649,7 +4649,8 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS case QUERY_NODE_SHOW_CREATE_STABLE_STMT: return extractShowCreateTableResultSchema(numOfCols, pSchema); case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT: - return extractShowLocalVariablesResultSchema(numOfCols, pSchema); + case QUERY_NODE_SHOW_VARIABLES_STMT: + return extractShowVariablesResultSchema(numOfCols, pSchema); default: break; } @@ -5874,7 +5875,6 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_SHOW_CLUSTER_STMT: case QUERY_NODE_SHOW_TOPICS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT: - case QUERY_NODE_SHOW_VARIABLES_STMT: case QUERY_NODE_SHOW_APPS_STMT: code = rewriteShow(pCxt, pQuery); break; @@ -5974,6 +5974,14 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_ALTER_LOCAL_STMT: pQuery->execMode = QUERY_EXEC_MODE_LOCAL; break; + case QUERY_NODE_SHOW_VARIABLES_STMT: + pQuery->haveResultSet = true; + pQuery->execMode = QUERY_EXEC_MODE_RPC; + if (NULL != pCxt->pCmdMsg) { + TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg); + pQuery->msgType = pQuery->pCmdMsg->msgType; + } + break; default: pQuery->execMode = QUERY_EXEC_MODE_RPC; if (NULL != pCxt->pCmdMsg) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index df608412b03164b590b62e37aca3cf9f9f7d7e83..b433a880015beaaa282be72f1a4063eae1a5c3c6 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -144,6 +144,23 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SServerVerReq req = {0}; + + int32_t bufLen = tSerializeSServerVerReq(NULL, 0, &req); + void *pBuf = (*mallcFp)(bufLen); + tSerializeSServerVerReq(pBuf, bufLen, &req); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} + int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { if (NULL == msg || NULL == msgLen) { @@ -467,6 +484,26 @@ int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) { return code; } +int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) { + SServerVerRsp out = {0}; + int32_t code = 0; + + if (NULL == output || NULL == msg || msgSize <= 0) { + code = TSDB_CODE_TSC_INVALID_INPUT; + return code; + } + + if (tDeserializeSServerVerRsp(msg, msgSize, &out) != 0) { + qError("invalid svr ver rsp msg, msgSize:%d", msgSize); + code = TSDB_CODE_INVALID_MSG; + return code; + } + + *(char**)output = strdup(out.ver); + + return code; +} + int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) { SDbCfgRsp out = {0}; @@ -583,6 +620,7 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg; queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; @@ -596,6 +634,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp; } #pragma GCC diagnostic pop