diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 95cd82c9fde7de69e24ffd4b7d233bae1f29a992..93d83eb4190adeaa03407e48aac14bbc6529a3c8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1330,13 +1330,13 @@ typedef struct SVShowTablesReq { } SVShowTablesReq; typedef struct SVShowTablesRsp { - int64_t id; + int32_t id; STableMetaMsg metaInfo; } SVShowTablesRsp; typedef struct SVShowTablesFetchReq { SMsgHead head; - int64_t id; + int32_t id; } SVShowTablesFetchReq; typedef struct SVShowTablesFetchRsp { diff --git a/include/common/tname.h b/include/common/tname.h index 3ac7f8b27b40178e73f8f9726ed45a8d27d9f952..11d97dac06d7240664f48626cd9d58ad6c555afa 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -17,6 +17,7 @@ #define TDENGINE_TNAME_H #include "tdef.h" +#include "tmsg.h" #define TSDB_DB_NAME_T 1 #define TSDB_TABLE_NAME_T 2 @@ -58,4 +59,6 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type); int32_t tNameSetAcctId(SName* dst, int32_t acctId); +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); + #endif // TDENGINE_TNAME_H diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index dd3d92866fbf1a1864d37db32a00576e39693b0f..a9e1f26d2014225a5a8481b497906f1d1f34aab4 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -74,7 +74,6 @@ int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo); STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex); SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex); -SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name); int32_t getNewResColId(); void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn); diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 83047a44de3e210f611ff8d83426758a79384f9d..9897467230675dae1b4b3e7909acd748504dcec7 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -54,8 +54,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -void qWorkerDestroy(void **qWorkerMgmt); +int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); + +int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +void qWorkerDestroy(void **qWorkerMgmt); #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3d295830c75f17adb86bf5d64093be77be5a6b4a..efe327242a5619f0f3c7271b838df00845050bd5 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -490,7 +490,13 @@ void* doFetchRow(SRequestObj* pRequest) { SReqResultInfo* pResultInfo = &pRequest->body.resInfo; if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { - pRequest->type = TDMT_MND_SHOW_RETRIEVE; + if (pRequest->type == TDMT_MND_SHOW) { + pRequest->type = TDMT_MND_SHOW_RETRIEVE; + } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { + pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; + } else { + // do nothing + } SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f7cf6610197f60e476c11ea7cbd87b7f602e1e02..6bfbcb4f7efc40ec51d64d43aa2bdfe1bd8cf976 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -74,39 +74,41 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) { - pMsgSendInfo->msgType = TDMT_MND_SHOW_RETRIEVE; - pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg); - pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->param = pRequest; - pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; - - SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); - if (pRetrieveMsg == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - pRetrieveMsg->showId = htonl(pRequest->body.execId); - pMsgSendInfo->msgInfo.pData = pRetrieveMsg; - return TSDB_CODE_SUCCESS; -} - SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); - if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { - buildRetrieveMnodeMsg(pRequest, pMsgSendInfo); + pMsgSendInfo->requestObjRefId = pRequest->self; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->param = pRequest; + pMsgSendInfo->msgType = pRequest->type; + + if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { + if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { + SRetrieveTableMsg* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); + if (pRetrieveMsg == NULL) { + return NULL; + } + + pRetrieveMsg->showId = htonl(pRequest->body.execId); + pMsgSendInfo->msgInfo.pData = pRetrieveMsg; + pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg); + } else { + SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq)); + if (pFetchMsg == NULL) { + return NULL; + } + + pFetchMsg->id = htonl(pRequest->body.execId); + pFetchMsg->head.vgId = htonl(13); + pMsgSendInfo->msgInfo.pData = pFetchMsg; + pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq); + } } else { assert(pRequest != NULL); - pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->msgInfo = pRequest->body.requestMsg; - pMsgSendInfo->msgType = pRequest->type; - pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->param = pRequest; - - pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; } + pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; return pMsgSendInfo; } @@ -128,6 +130,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { pMetaMsg->tuid = htobe64(pMetaMsg->tuid); for (int i = 0; i < pMetaMsg->numOfColumns; ++i) { pSchema->bytes = htonl(pSchema->bytes); + pSchema->colId = htonl(pSchema->colId); pSchema++; } @@ -157,19 +160,23 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) assert(pMsg->len >= sizeof(SRetrieveTableRsp)); SRequestObj* pRequest = param; -// tfree(pRequest->body.resInfo.pRspMsg); -// pRequest->body.resInfo.pRspMsg = pMsg->pData; + SReqResultInfo* pResInfo = &pRequest->body.resInfo; + + tfree(pResInfo->pRspMsg); + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + terrno = code; + tsem_post(&pRequest->body.rspSem); + return code; + } SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData; pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->precision = htons(pRetrieve->precision); - SReqResultInfo* pResInfo = &pRequest->body.resInfo; - - tfree(pResInfo->pRspMsg); pResInfo->pRspMsg = pMsg->pData; pResInfo->numOfRows = pRetrieve->numOfRows; - pResInfo->pData = pRetrieve->data; // todo fix this in async model + pResInfo->pData = pRetrieve->data; pResInfo->current = 0; setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); @@ -181,6 +188,42 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) return 0; } +int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) { + assert(pMsg->len >= sizeof(SRetrieveTableRsp)); + + SRequestObj* pRequest = param; + tfree(pRequest->body.resInfo.pRspMsg); + + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + terrno = code; + tsem_post(&pRequest->body.rspSem); + return code; + } + + pRequest->body.resInfo.pRspMsg = pMsg->pData; + + SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData; + pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows); + pFetchRsp->precision = htons(pFetchRsp->precision); + + SReqResultInfo* pResInfo = &pRequest->body.resInfo; + + tfree(pResInfo->pRspMsg); + pResInfo->pRspMsg = pMsg->pData; + pResInfo->numOfRows = pFetchRsp->numOfRows; + pResInfo->pData = pFetchRsp->data; + + pResInfo->current = 0; + setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); + + tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows, + pFetchRsp->completed, pRequest->body.execId); + + tsem_post(&pRequest->body.rspSem); + return 0; +} + int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { // todo rsp with the vnode id list SRequestObj* pRequest = param; @@ -304,4 +347,7 @@ void initMsgHandleFp() { handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; + + handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp; } \ No newline at end of file diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e88e7411bfa3562b6ad182d8ba6bed21fc24ebae..be6e048378572342f010b0b806771cd34197c978 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -227,25 +227,25 @@ TEST(testCase, use_db_test) { } TEST(testCase, drop_db_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - showDB(pConn); - - TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - showDB(pConn); - - pRes = taos_query(pConn, "create database abc1"); - if (taos_errno(pRes) != 0) { - printf("create to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - taos_close(pConn); +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// showDB(pConn); +// +// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// showDB(pConn); +// +// pRes = taos_query(pConn, "create database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// taos_close(pConn); } TEST(testCase, create_stable_Test) { @@ -302,12 +302,12 @@ TEST(testCase, create_ctable_Test) { } taos_free_result(pRes); - pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); +// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); taos_close(pConn); } @@ -443,7 +443,23 @@ TEST(testCase, show_table_Test) { taos_free_result(pRes); pRes = taos_query(pConn, "show tables"); - taos_free_result(pRes); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); taos_close(pConn); } diff --git a/source/common/src/tname.c b/source/common/src/tname.c index 88a5ebb7f6eda28ddfb0b65ae7e5dc7030fada9f..f8ef9f097951c346fd2147c19b70fb20dc3ab920 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -259,3 +259,13 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { return 0; } + +SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { + SSchema s = {0}; + s.type = type; + s.bytes = bytes; + s.colId = colId; + + tstrncpy(s.name, name, tListLen(s.name)); + return s; +} \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index b726743c9a0067ca37d21bdb3aa272964c5492f0..2c3be4a700a47354a1fd379c0a33547314b19c06 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -141,6 +141,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 1c6924040c34775c7927eb9c85af252af7dcf072..6632676d8bb336513295f25280c495a831d97382 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -36,12 +36,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); case TDMT_VND_DROP_TASK: return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_SHOW_TABLES: + return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_SHOW_TABLES_FETCH: + return qWorkerProcessShowFetchMsg(pVnode, pVnode->pQuery, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; - break; } - return 0; } static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index bdbc095861b1199d95eea8952d92ba596369691a..6b0e0ed828ff70739a09659b1cf2bba8b6c6e3cb 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -90,7 +90,6 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg)); pShowMsg->type = pShowInfo->showType; - if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) { SToken* pPattern = &pShowInfo->pattern; if (pPattern->type > 0) { // only show tables support wildcard query diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 7160b13eba2a94a8629f1f1176452a2d0c1fa29a..8a09ea9ed8a22aa2a22778562c846618d7db1868 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -18,7 +18,7 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) { } static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen, - SMsgBuf* pMsgBuf) { + SEpSet* pEpSet, SMsgBuf* pMsgBuf) { const char* msg1 = "invalid name"; const char* msg2 = "wildcard string should be less than %d characters"; const char* msg3 = "database name too long"; @@ -31,57 +31,69 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou * wildcard in like clause in pInfo->pMiscInfo->a[1] */ int16_t showType = pShowInfo->showType; - if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) { - SToken* pDbPrefixToken = &pShowInfo->prefix; - if (pDbPrefixToken->type != 0) { - if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long - return buildInvalidOperationMsg(pMsgBuf, msg3); - } + if (showType == TSDB_MGMT_TABLE_TABLE) { + SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); + *pEpSet = pCtx->mgmtEpSet; + + // catalogGetDBVgroupVersion() + pShowReq->head.vgId = htonl(13); + *outputLen = sizeof(SVShowTablesReq); + *output = pShowReq; + } else { + if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) { + SToken* pDbPrefixToken = &pShowInfo->prefix; + if (pDbPrefixToken->type != 0) { + if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long + return buildInvalidOperationMsg(pMsgBuf, msg3); + } - if (pDbPrefixToken->n <= 0) { - return buildInvalidOperationMsg(pMsgBuf, msg5); - } + if (pDbPrefixToken->n <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg5); + } - if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + + // int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken); + // if (ret != TSDB_CODE_SUCCESS) { + // return buildInvalidOperationMsg(pMsgBuf, msg1); + // } } - // int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken); - // if (ret != TSDB_CODE_SUCCESS) { - // return buildInvalidOperationMsg(pMsgBuf, msg1); - // } - } + // show table/stable like 'xxxx', set the like pattern for show tables + SToken* pPattern = &pShowInfo->pattern; + if (pPattern->type != 0) { + if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) { + return buildInvalidOperationMsg(pMsgBuf, msg4); + } - // show table/stable like 'xxxx', set the like pattern for show tables - SToken* pPattern = &pShowInfo->pattern; - if (pPattern->type != 0) { - if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) { - return buildInvalidOperationMsg(pMsgBuf, msg4); - } + pPattern->n = strdequote(pPattern->z); + if (pPattern->n <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg6); + } - pPattern->n = strdequote(pPattern->z); - if (pPattern->n <= 0) { - return buildInvalidOperationMsg(pMsgBuf, msg6); + if (pPattern->n > tsMaxWildCardsLen) { + char tmp[64] = {0}; + sprintf(tmp, msg2, tsMaxWildCardsLen); + return buildInvalidOperationMsg(pMsgBuf, tmp); + } + } + } else if (showType == TSDB_MGMT_TABLE_VNODES) { + if (pShowInfo->prefix.type == 0) { + return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep"); } - if (pPattern->n > tsMaxWildCardsLen) { - char tmp[64] = {0}; - sprintf(tmp, msg2, tsMaxWildCardsLen); - return buildInvalidOperationMsg(pMsgBuf, tmp); + if (pShowInfo->prefix.type == TK_STRING) { + pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z); } } - } else if (showType == TSDB_MGMT_TABLE_VNODES) { - if (pShowInfo->prefix.type == 0) { - return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep"); - } - if (pShowInfo->prefix.type == TK_STRING) { - pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z); - } + *pEpSet = pCtx->mgmtEpSet; + *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len); + *outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/; } - *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len); - *outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/; return TSDB_CODE_SUCCESS; } @@ -608,8 +620,9 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } case TSDB_SQL_SHOW: { - code = setShowInfo(&pInfo->pMiscInfo->showOpt, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, pMsgBuf); - pDcl->msgType = TDMT_MND_SHOW; + SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; + code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, pMsgBuf); + pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW; break; } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 20f330247e57c756874cab299b41a45095cfb25a..a454061fd7dd28c3fccd4f3e489b58a4091cfd9b 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -563,16 +563,6 @@ TAOS_FIELD createField(const SSchema* pSchema) { return f; } -SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name) { - SSchema s = {0}; - s.type = type; - s.bytes = bytes; - s.colId = colId; - - tstrncpy(s.name, name, tListLen(s.name)); - return s; -} - void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) { pColumn->uid = uid; pColumn->flag = flag; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 4296e82a5617b2847c02011fb76b5f808602cff2..d5ecd40ccd140a5c140604bfd6ba69cf17e21342 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1,8 +1,9 @@ -#include "tmsg.h" -#include "query.h" #include "qworker.h" -#include "qworkerInt.h" +#include "tname.h" #include "planner.h" +#include "query.h" +#include "qworkerInt.h" +#include "tmsg.h" int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t code = 0; @@ -634,7 +635,6 @@ int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { return TSDB_CODE_SUCCESS; } - int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); pRsp->code = code; @@ -665,11 +665,68 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { }; rpcSendResponse(&rpcRsp); + return TSDB_CODE_SUCCESS; +} + +int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { + int32_t numOfCols = 6; + SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols); + + int32_t cols = 0; + SSchema *pSchema = pRsp->metaInfo.pSchema; + + const SSchema *s = tGetTbnameColumnSchema(); + *pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "name"); + pSchema++; + + int32_t type = TSDB_DATA_TYPE_TIMESTAMP; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "created"); + pSchema++; + + type = TSDB_DATA_TYPE_SMALLINT; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "columns"); + pSchema++; + + *pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "stable"); + pSchema++; + + type = TSDB_DATA_TYPE_BIGINT; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "uid"); + pSchema++; + + type = TSDB_DATA_TYPE_INT; + *pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "vgId"); + + pRsp->metaInfo.numOfColumns = htonl(cols); + + SRpcMsg rpcMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + rpcSendResponse(&rpcMsg); return TSDB_CODE_SUCCESS; } +int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { + SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); + int32_t handle = htonl(pFetchReq->id); + + pRsp->numOfRows = 0; + SRpcMsg rpcMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = 0, + }; + rpcSendResponse(&rpcMsg); + return TSDB_CODE_SUCCESS; +} int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { SQWSchStatus *sch = NULL; @@ -801,7 +858,6 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI return TSDB_CODE_SUCCESS; } - int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; @@ -911,7 +967,6 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 return TSDB_CODE_SUCCESS; } - int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); if (NULL == mgmt) { @@ -1157,6 +1212,25 @@ _return: return TSDB_CODE_SUCCESS; } +int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + int32_t code = 0; + SVShowTablesReq *pReq = pMsg->pCont; + QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code)); +} + +int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SVShowTablesFetchReq *pFetchReq = pMsg->pCont; + QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); +} + int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; int8_t status = 0; @@ -1182,7 +1256,6 @@ int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_RET(code); } - void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { return;