提交 be7bccae 编写于 作者: H Haojun Liao

[td-11818] support show tables;

上级 d7912feb
...@@ -1245,13 +1245,13 @@ typedef struct SVShowTablesReq { ...@@ -1245,13 +1245,13 @@ typedef struct SVShowTablesReq {
} SVShowTablesReq; } SVShowTablesReq;
typedef struct SVShowTablesRsp { typedef struct SVShowTablesRsp {
int64_t id; int32_t id;
STableMetaMsg metaInfo; STableMetaMsg metaInfo;
} SVShowTablesRsp; } SVShowTablesRsp;
typedef struct SVShowTablesFetchReq { typedef struct SVShowTablesFetchReq {
SMsgHead head; SMsgHead head;
int64_t id; int32_t id;
} SVShowTablesFetchReq; } SVShowTablesFetchReq;
typedef struct SVShowTablesFetchRsp { typedef struct SVShowTablesFetchRsp {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define TDENGINE_TNAME_H #define TDENGINE_TNAME_H
#include "tdef.h" #include "tdef.h"
#include "tmsg.h"
#define TSDB_DB_NAME_T 1 #define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2 #define TSDB_TABLE_NAME_T 2
...@@ -58,4 +59,6 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type); ...@@ -58,4 +59,6 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
int32_t tNameSetAcctId(SName* dst, int32_t acctId); int32_t tNameSetAcctId(SName* dst, int32_t acctId);
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name);
#endif // TDENGINE_TNAME_H #endif // TDENGINE_TNAME_H
...@@ -74,7 +74,6 @@ int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo); ...@@ -74,7 +74,6 @@ int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo);
STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex); STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex); SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
int32_t getNewResColId(); int32_t getNewResColId();
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn); void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
......
...@@ -54,8 +54,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); ...@@ -54,8 +54,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
void qWorkerDestroy(void **qWorkerMgmt);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -480,7 +480,13 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -480,7 +480,13 @@ void* doFetchRow(SRequestObj* pRequest) {
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE; if (pRequest->type == TDMT_MND_SHOW) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
} else {
// do nothing
}
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
......
...@@ -74,39 +74,41 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -74,39 +74,41 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) {
pMsgSendInfo->msgType = TDMT_MND_SHOW_RETRIEVE;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
if (pRetrieveMsg == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
return TSDB_CODE_SUCCESS;
}
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) { SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { pMsgSendInfo->requestObjRefId = pRequest->self;
buildRetrieveMnodeMsg(pRequest, pMsgSendInfo); pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->msgType = pRequest->type;
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
SRetrieveTableMsg* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
if (pRetrieveMsg == NULL) {
return NULL;
}
pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
} else {
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
if (pFetchMsg == NULL) {
return NULL;
}
pFetchMsg->id = htonl(pRequest->body.execId);
pFetchMsg->head.vgId = htonl(13);
pMsgSendInfo->msgInfo.pData = pFetchMsg;
pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
}
} else { } else {
assert(pRequest != NULL); assert(pRequest != NULL);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->msgInfo = pRequest->body.requestMsg; pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
pMsgSendInfo->msgType = pRequest->type;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
} }
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo; return pMsgSendInfo;
} }
...@@ -128,6 +130,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -128,6 +130,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pMetaMsg->tuid = htobe64(pMetaMsg->tuid); pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) { for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
pSchema->bytes = htonl(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
pSchema++; pSchema++;
} }
...@@ -157,8 +160,16 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) ...@@ -157,8 +160,16 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
assert(pMsg->len >= sizeof(SRetrieveTableRsp)); assert(pMsg->len >= sizeof(SRetrieveTableRsp));
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
// tfree(pRequest->body.resInfo.pRspMsg); tfree(pRequest->body.resInfo.pRspMsg);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
terrno = code;
tsem_post(&pRequest->body.rspSem);
return code;
}
pRequest->body.resInfo.pRspMsg = pMsg->pData;
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData; SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
...@@ -169,7 +180,7 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) ...@@ -169,7 +180,7 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
tfree(pResInfo->pRspMsg); tfree(pResInfo->pRspMsg);
pResInfo->pRspMsg = pMsg->pData; pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pRetrieve->numOfRows; pResInfo->numOfRows = pRetrieve->numOfRows;
pResInfo->pData = pRetrieve->data; // todo fix this in async model pResInfo->pData = pRetrieve->data;
pResInfo->current = 0; pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
...@@ -181,6 +192,42 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) ...@@ -181,6 +192,42 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
return 0; return 0;
} }
int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
SRequestObj* pRequest = param;
tfree(pRequest->body.resInfo.pRspMsg);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
terrno = code;
tsem_post(&pRequest->body.rspSem);
return code;
}
pRequest->body.resInfo.pRspMsg = pMsg->pData;
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData;
pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows);
pFetchRsp->precision = htons(pFetchRsp->precision);
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
tfree(pResInfo->pRspMsg);
pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pFetchRsp->numOfRows;
pResInfo->pData = pFetchRsp->data;
pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
pFetchRsp->completed, pRequest->body.execId);
tsem_post(&pRequest->body.rspSem);
return 0;
}
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list // todo rsp with the vnode id list
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
...@@ -297,4 +344,7 @@ void initMsgHandleFp() { ...@@ -297,4 +344,7 @@ void initMsgHandleFp() {
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
} }
\ No newline at end of file
...@@ -226,25 +226,25 @@ TEST(testCase, use_db_test) { ...@@ -226,25 +226,25 @@ TEST(testCase, use_db_test) {
} }
TEST(testCase, drop_db_test) { TEST(testCase, drop_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); // assert(pConn != NULL);
//
showDB(pConn); // showDB(pConn);
//
TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); // TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); // printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
} // }
taos_free_result(pRes); // taos_free_result(pRes);
//
showDB(pConn); // showDB(pConn);
//
pRes = taos_query(pConn, "create database abc1"); // pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("create to drop db, reason:%s\n", taos_errstr(pRes)); // printf("create to drop db, reason:%s\n", taos_errstr(pRes));
} // }
taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); // taos_close(pConn);
} }
TEST(testCase, create_stable_Test) { TEST(testCase, create_stable_Test) {
...@@ -301,12 +301,12 @@ TEST(testCase, create_ctable_Test) { ...@@ -301,12 +301,12 @@ TEST(testCase, create_ctable_Test) {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); // pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); // printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
} // }
//
taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
...@@ -441,7 +441,23 @@ TEST(testCase, show_table_Test) { ...@@ -441,7 +441,23 @@ TEST(testCase, show_table_Test) {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "show tables"); pRes = taos_query(pConn, "show tables");
taos_free_result(pRes); if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
...@@ -259,3 +259,13 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { ...@@ -259,3 +259,13 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return 0; return 0;
} }
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) {
SSchema s = {0};
s.type = type;
s.bytes = bytes;
s.colId = colId;
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
\ No newline at end of file
...@@ -123,6 +123,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -123,6 +123,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
} }
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
......
...@@ -36,12 +36,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -36,12 +36,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_SHOW_TABLES:
return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_SHOW_TABLES_FETCH:
return qWorkerProcessShowFetchMsg(pVnode, pVnode->pQuery, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
break;
} }
return 0;
} }
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
...@@ -90,7 +90,6 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, ...@@ -90,7 +90,6 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf,
SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg)); SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg));
pShowMsg->type = pShowInfo->showType; pShowMsg->type = pShowInfo->showType;
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) { if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
SToken* pPattern = &pShowInfo->pattern; SToken* pPattern = &pShowInfo->pattern;
if (pPattern->type > 0) { // only show tables support wildcard query if (pPattern->type > 0) { // only show tables support wildcard query
......
...@@ -18,7 +18,7 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) { ...@@ -18,7 +18,7 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) {
} }
static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen, static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen,
SMsgBuf* pMsgBuf) { SEpSet* pEpSet, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid name"; const char* msg1 = "invalid name";
const char* msg2 = "wildcard string should be less than %d characters"; const char* msg2 = "wildcard string should be less than %d characters";
const char* msg3 = "database name too long"; const char* msg3 = "database name too long";
...@@ -31,57 +31,69 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou ...@@ -31,57 +31,69 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
* wildcard in like clause in pInfo->pMiscInfo->a[1] * wildcard in like clause in pInfo->pMiscInfo->a[1]
*/ */
int16_t showType = pShowInfo->showType; int16_t showType = pShowInfo->showType;
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) { if (showType == TSDB_MGMT_TABLE_TABLE) {
SToken* pDbPrefixToken = &pShowInfo->prefix; SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
if (pDbPrefixToken->type != 0) { *pEpSet = pCtx->mgmtEpSet;
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
return buildInvalidOperationMsg(pMsgBuf, msg3); // catalogGetDBVgroupVersion()
} pShowReq->head.vgId = htonl(13);
*outputLen = sizeof(SVShowTablesReq);
*output = pShowReq;
} else {
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
SToken* pDbPrefixToken = &pShowInfo->prefix;
if (pDbPrefixToken->type != 0) {
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (pDbPrefixToken->n <= 0) { if (pDbPrefixToken->n <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg5); return buildInvalidOperationMsg(pMsgBuf, msg5);
} }
if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) { if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
}
// int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken);
// if (ret != TSDB_CODE_SUCCESS) {
// return buildInvalidOperationMsg(pMsgBuf, msg1);
// }
} }
// int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken); // show table/stable like 'xxxx', set the like pattern for show tables
// if (ret != TSDB_CODE_SUCCESS) { SToken* pPattern = &pShowInfo->pattern;
// return buildInvalidOperationMsg(pMsgBuf, msg1); if (pPattern->type != 0) {
// } if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) {
} return buildInvalidOperationMsg(pMsgBuf, msg4);
}
// show table/stable like 'xxxx', set the like pattern for show tables pPattern->n = strdequote(pPattern->z);
SToken* pPattern = &pShowInfo->pattern; if (pPattern->n <= 0) {
if (pPattern->type != 0) { return buildInvalidOperationMsg(pMsgBuf, msg6);
if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) { }
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
pPattern->n = strdequote(pPattern->z); if (pPattern->n > tsMaxWildCardsLen) {
if (pPattern->n <= 0) { char tmp[64] = {0};
return buildInvalidOperationMsg(pMsgBuf, msg6); sprintf(tmp, msg2, tsMaxWildCardsLen);
return buildInvalidOperationMsg(pMsgBuf, tmp);
}
}
} else if (showType == TSDB_MGMT_TABLE_VNODES) {
if (pShowInfo->prefix.type == 0) {
return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep");
} }
if (pPattern->n > tsMaxWildCardsLen) { if (pShowInfo->prefix.type == TK_STRING) {
char tmp[64] = {0}; pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z);
sprintf(tmp, msg2, tsMaxWildCardsLen);
return buildInvalidOperationMsg(pMsgBuf, tmp);
} }
} }
} else if (showType == TSDB_MGMT_TABLE_VNODES) {
if (pShowInfo->prefix.type == 0) {
return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep");
}
if (pShowInfo->prefix.type == TK_STRING) { *pEpSet = pCtx->mgmtEpSet;
pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z); *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
} *outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/;
} }
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
*outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -608,8 +620,9 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -608,8 +620,9 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} }
case TSDB_SQL_SHOW: { case TSDB_SQL_SHOW: {
code = setShowInfo(&pInfo->pMiscInfo->showOpt, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, pMsgBuf); SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
pDcl->msgType = TDMT_MND_SHOW; code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, pMsgBuf);
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW;
break; break;
} }
......
...@@ -563,16 +563,6 @@ TAOS_FIELD createField(const SSchema* pSchema) { ...@@ -563,16 +563,6 @@ TAOS_FIELD createField(const SSchema* pSchema) {
return f; return f;
} }
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name) {
SSchema s = {0};
s.type = type;
s.bytes = bytes;
s.colId = colId;
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) { void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) {
pColumn->uid = uid; pColumn->uid = uid;
pColumn->flag = flag; pColumn->flag = flag;
......
#include "tmsg.h"
#include "query.h"
#include "qworker.h" #include "qworker.h"
#include "qworkerInt.h" #include "tname.h"
#include "planner.h" #include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "tmsg.h"
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
...@@ -634,7 +635,6 @@ int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { ...@@ -634,7 +635,6 @@ int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code; pRsp->code = code;
...@@ -665,11 +665,68 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -665,11 +665,68 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
int32_t numOfCols = 6;
SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols);
int32_t cols = 0;
SSchema *pSchema = pRsp->metaInfo.pSchema;
const SSchema *s = tGetTbnameColumnSchema();
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "name");
pSchema++;
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "created");
pSchema++;
type = TSDB_DATA_TYPE_SMALLINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "columns");
pSchema++;
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "stable");
pSchema++;
type = TSDB_DATA_TYPE_BIGINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "uid");
pSchema++;
type = TSDB_DATA_TYPE_INT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "vgId");
pRsp->metaInfo.numOfColumns = htonl(cols);
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id);
pRsp->numOfRows = 0;
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
...@@ -801,7 +858,6 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI ...@@ -801,7 +858,6 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
...@@ -911,7 +967,6 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 ...@@ -911,7 +967,6 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) { if (NULL == mgmt) {
...@@ -1157,6 +1212,25 @@ _return: ...@@ -1157,6 +1212,25 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code));
}
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
}
int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = 0;
...@@ -1182,7 +1256,6 @@ int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1182,7 +1256,6 @@ int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_RET(code); QW_RET(code);
} }
void qWorkerDestroy(void **qWorkerMgmt) { void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册