未验证 提交 98648acd 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #11157 from taosdata/feature/3.0_wxy

TD-14405 TD-14404 desc and reset query cache statement implement
......@@ -138,59 +138,62 @@
#define TK_INTERVAL 120
#define TK_TOPIC 121
#define TK_AS 122
#define TK_EXPLAIN 123
#define TK_ANALYZE 124
#define TK_VERBOSE 125
#define TK_NK_BOOL 126
#define TK_RATIO 127
#define TK_NULL 128
#define TK_NK_VARIABLE 129
#define TK_NK_UNDERLINE 130
#define TK_ROWTS 131
#define TK_TBNAME 132
#define TK_QSTARTTS 133
#define TK_QENDTS 134
#define TK_WSTARTTS 135
#define TK_WENDTS 136
#define TK_WDURATION 137
#define TK_BETWEEN 138
#define TK_IS 139
#define TK_NK_LT 140
#define TK_NK_GT 141
#define TK_NK_LE 142
#define TK_NK_GE 143
#define TK_NK_NE 144
#define TK_MATCH 145
#define TK_NMATCH 146
#define TK_IN 147
#define TK_JOIN 148
#define TK_INNER 149
#define TK_SELECT 150
#define TK_DISTINCT 151
#define TK_WHERE 152
#define TK_PARTITION 153
#define TK_BY 154
#define TK_SESSION 155
#define TK_STATE_WINDOW 156
#define TK_SLIDING 157
#define TK_FILL 158
#define TK_VALUE 159
#define TK_NONE 160
#define TK_PREV 161
#define TK_LINEAR 162
#define TK_NEXT 163
#define TK_GROUP 164
#define TK_HAVING 165
#define TK_ORDER 166
#define TK_SLIMIT 167
#define TK_SOFFSET 168
#define TK_LIMIT 169
#define TK_OFFSET 170
#define TK_ASC 171
#define TK_DESC 172
#define TK_NULLS 173
#define TK_FIRST 174
#define TK_LAST 175
#define TK_DESC 123
#define TK_DESCRIBE 124
#define TK_RESET 125
#define TK_QUERY 126
#define TK_EXPLAIN 127
#define TK_ANALYZE 128
#define TK_VERBOSE 129
#define TK_NK_BOOL 130
#define TK_RATIO 131
#define TK_NULL 132
#define TK_NK_VARIABLE 133
#define TK_NK_UNDERLINE 134
#define TK_ROWTS 135
#define TK_TBNAME 136
#define TK_QSTARTTS 137
#define TK_QENDTS 138
#define TK_WSTARTTS 139
#define TK_WENDTS 140
#define TK_WDURATION 141
#define TK_BETWEEN 142
#define TK_IS 143
#define TK_NK_LT 144
#define TK_NK_GT 145
#define TK_NK_LE 146
#define TK_NK_GE 147
#define TK_NK_NE 148
#define TK_MATCH 149
#define TK_NMATCH 150
#define TK_IN 151
#define TK_JOIN 152
#define TK_INNER 153
#define TK_SELECT 154
#define TK_DISTINCT 155
#define TK_WHERE 156
#define TK_PARTITION 157
#define TK_BY 158
#define TK_SESSION 159
#define TK_STATE_WINDOW 160
#define TK_SLIDING 161
#define TK_FILL 162
#define TK_VALUE 163
#define TK_NONE 164
#define TK_PREV 165
#define TK_LINEAR 166
#define TK_NEXT 167
#define TK_GROUP 168
#define TK_HAVING 169
#define TK_ORDER 170
#define TK_SLIMIT 171
#define TK_SOFFSET 172
#define TK_LIMIT 173
#define TK_OFFSET 174
#define TK_ASC 175
#define TK_NULLS 176
#define TK_FIRST 177
#define TK_LAST 178
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "tmsg.h"
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
......@@ -20,8 +20,14 @@
extern "C" {
#endif
#include "query.h"
#include "querynodes.h"
#define DESCRIBE_RESULT_COLS 4
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
typedef struct SDatabaseOptions {
ENodeType type;
int32_t numOfBlocks;
......@@ -247,6 +253,13 @@ typedef struct SAlterLocalStmt {
char value[TSDB_DNODE_VALUE_LEN];
} SAlterLocalStmt;
typedef struct SDescribeStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
STableMeta* pMeta;
} SDescribeStmt;
#ifdef __cplusplus
}
#endif
......
......@@ -101,6 +101,8 @@ typedef enum ENodeType {
QUERY_NODE_DROP_TOPIC_STMT,
QUERY_NODE_ALTER_LOCAL_STMT,
QUERY_NODE_EXPLAIN_STMT,
QUERY_NODE_DESCRIBE_STMT,
QUERY_NODE_RESET_QUERY_CACHE_STMT,
QUERY_NODE_SHOW_DATABASES_STMT,
QUERY_NODE_SHOW_TABLES_STMT,
QUERY_NODE_SHOW_STABLES_STMT,
......
......@@ -55,6 +55,7 @@ typedef struct SQuery {
SArray* pDbList;
SArray* pTableList;
bool showRewrite;
bool localCmd;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries(
taos
INTERFACE api
PRIVATE os util common transport nodes parser planner catalog scheduler function qcom
PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom
)
if(${BUILD_TEST})
......
#include "clientInt.h"
#include "clientLog.h"
#include "command.h"
#include "scheduler.h"
#include "tdatablock.h"
#include "tdef.h"
......@@ -170,6 +171,15 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
return code;
}
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp);
}
return code;
}
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
pRequest->type = pMsgInfo->msgType;
......@@ -259,7 +269,9 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQuery), _return);
if (pQuery->directRpc) {
if (pQuery->localCmd) {
CHECK_CODE_GOTO(execLocalCmd(pRequest, pQuery), _return);
} else if (pQuery->directRpc) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
} else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
......@@ -464,9 +476,11 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
assert(pMsg->ahandle != NULL);
......@@ -647,6 +661,11 @@ void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) {
}
}
if (pResultInfo->completed) {
pResultInfo->numOfRows = 0;
return NULL;
}
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
......
......@@ -17,3 +17,4 @@ add_subdirectory(tfs)
add_subdirectory(monitor)
add_subdirectory(nodes)
add_subdirectory(scalar)
add_subdirectory(command)
aux_source_directory(src COMMAND_SRC)
add_library(command STATIC ${COMMAND_SRC})
target_include_directories(
command
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/command"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
command
PRIVATE os util nodes catalog function transport qcom
)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "command.h"
#include "tdatablock.h"
// #define SET_VARSTR(pData, val, pOffset)
static void buildRspData(const STableMeta* pMeta, char* pData) {
int32_t* pColSizes = (int32_t*)pData;
pData += DESCRIBE_RESULT_COLS * sizeof(int32_t);
int32_t numOfRows = TABLE_TOTAL_COL_NUM(pMeta);
// Field
int32_t* pOffset = (int32_t*)pData;
pData += numOfRows * sizeof(int32_t);
for (int32_t i = 0; i < numOfRows; ++i) {
STR_TO_VARSTR(pData, pMeta->schema[i].name);
int16_t len = varDataTLen(pData);
pData += len;
*pOffset = pColSizes[0];
pOffset += 1;
pColSizes[0] += len;
}
// Type
pOffset = (int32_t*)pData;
pData += numOfRows * sizeof(int32_t);
for (int32_t i = 0; i < numOfRows; ++i) {
STR_TO_VARSTR(pData, tDataTypes[pMeta->schema[i].type].name);
int16_t len = varDataTLen(pData);
pData += len;
*pOffset = pColSizes[1];
pOffset += 1;
pColSizes[1] += len;
}
// Length
pData += BitmapLen(numOfRows);
for (int32_t i = 0; i < numOfRows; ++i) {
*(int32_t*)pData = pMeta->schema[i].bytes;
pData += sizeof(int32_t);
}
pColSizes[2] = sizeof(int32_t) * numOfRows;
// Note
pOffset = (int32_t*)pData;
pData += numOfRows * sizeof(int32_t);
for (int32_t i = 0; i < numOfRows; ++i) {
STR_TO_VARSTR(pData, i >= pMeta->tableInfo.numOfColumns ? "TAG" : "");
int16_t len = varDataTLen(pData);
pData += len;
*pOffset = pColSizes[3];
pOffset += 1;
pColSizes[3] += len;
}
for (int32_t i = 0; i < DESCRIBE_RESULT_COLS; ++i) {
pColSizes[i] = htonl(pColSizes[i]);
}
}
static int32_t calcRspSize(const STableMeta* pMeta) {
int32_t numOfRows = TABLE_TOTAL_COL_NUM(pMeta);
return sizeof(SRetrieveTableRsp) +
(numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_FIELD_LEN) +
(numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_TYPE_LEN) +
(BitmapLen(numOfRows) + numOfRows * sizeof(int32_t)) +
(numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_NOTE_LEN);
}
static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
SDescribeStmt* pDesc = (SDescribeStmt*)pStmt;
*pRsp = taosMemoryCalloc(1, calcRspSize(pDesc->pMeta));
if (NULL == *pRsp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pRsp)->useconds = 0;
(*pRsp)->completed = 1;
(*pRsp)->precision = 0;
(*pRsp)->compressed = 0;
(*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(TABLE_TOTAL_COL_NUM(pDesc->pMeta));
buildRspData(pDesc->pMeta, (*pRsp)->data);
return TSDB_CODE_SUCCESS;
}
static int32_t execResetQueryCache() {
// todo
return TSDB_CODE_SUCCESS;
}
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) {
switch (nodeType(pStmt)) {
case QUERY_NODE_DESCRIBE_STMT:
return execDescribe(pStmt, pRsp);
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
return execResetQueryCache();
default:
break;
}
return TSDB_CODE_FAILED;
}
MESSAGE(STATUS "build command unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(commandTest ${SOURCE_LIST})
TARGET_INCLUDE_DIRECTORIES(
commandTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/command/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/command/inc"
)
TARGET_LINK_LIBRARIES(
commandTest
PUBLIC os util common nodes parser catalog transport gtest function qcom
)
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -136,6 +136,10 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SDropTopicStmt));
case QUERY_NODE_EXPLAIN_STMT:
return makeNode(type, sizeof(SExplainStmt));
case QUERY_NODE_DESCRIBE_STMT:
return makeNode(type, sizeof(SDescribeStmt));
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
return makeNode(type, sizeof(SNode));
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
case QUERY_NODE_SHOW_STABLES_STMT:
......
......@@ -163,6 +163,8 @@ SNode* createDefaultExplainOptions(SAstCreateContext* pCxt);
SNode* setExplainVerbose(SAstCreateContext* pCxt, SNode* pOptions, const SToken* pVal);
SNode* setExplainRatio(SAstCreateContext* pCxt, SNode* pOptions, const SToken* pVal);
SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions, SNode* pQuery);
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable);
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
#ifdef __cplusplus
}
......
......@@ -25,6 +25,7 @@ extern "C" {
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
#ifdef __cplusplus
}
......
......@@ -339,7 +339,14 @@ cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_expression(C).
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS db_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, &C); }
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
/************************************************ select **************************************************************/
/************************************************ desc/describe *******************************************************/
cmd ::= DESC full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
cmd ::= DESCRIBE full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
/************************************************ reset query cache ***************************************************/
cmd ::= RESET QUERY CACHE. { pCxt->pRootNode = createResetQueryCacheStmt(pCxt); }
/************************************************ explain *************************************************************/
cmd ::= EXPLAIN analyze_opt(A) explain_options(B) query_expression(C). { pCxt->pRootNode = createExplainStmt(pCxt, A, B, C); }
%type analyze_opt { bool }
......
......@@ -1343,3 +1343,18 @@ SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions,
pStmt->pQuery = pQuery;
return (SNode*)pStmt;
}
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable) {
SDescribeStmt* pStmt = nodesMakeNode(QUERY_NODE_DESCRIBE_STMT);
CHECK_OUT_OF_MEM(pStmt);
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pRealTable);
return (SNode*)pStmt;
}
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt) {
SNode* pStmt = nodesMakeNode(QUERY_NODE_RESET_QUERY_CACHE_STMT);
CHECK_OUT_OF_MEM(pStmt);
return pStmt;
}
......@@ -51,6 +51,7 @@ static SKeyword keywordTable[] = {
{"DAYS", TK_DAYS},
{"DELAY", TK_DELAY},
{"DESC", TK_DESC},
{"DESCRIBE", TK_DESCRIBE},
{"DISTINCT", TK_DISTINCT},
{"DNODE", TK_DNODE},
{"DNODES", TK_DNODES},
......@@ -111,9 +112,11 @@ static SKeyword keywordTable[] = {
{"QNODE", TK_QNODE},
{"QNODES", TK_QNODES},
{"QSTARTTS", TK_QSTARTTS},
{"QUERY", TK_QUERY},
{"QUORUM", TK_QUORUM},
{"RATIO", TK_RATIO},
{"REPLICA", TK_REPLICA},
{"RESET", TK_RESET},
{"RETENTIONS", TK_RETENTIONS},
{"ROLLUP", TK_ROLLUP},
{"ROWTS", TK_ROWTS},
......@@ -186,7 +189,6 @@ static SKeyword keywordTable[] = {
// {"SCORES", TK_SCORES},
// {"GRANTS", TK_GRANTS},
// {"DOT", TK_DOT},
// {"DESCRIBE", TK_DESCRIBE},
// {"SYNCDB", TK_SYNCDB},
// {"LOCAL", TK_LOCAL},
// {"PPS", TK_PPS},
......@@ -203,8 +205,6 @@ static SKeyword keywordTable[] = {
// {"EVERY", TK_EVERY},
// {"VARIABLE", TK_VARIABLE},
// {"UPDATE", TK_UPDATE},
// {"RESET", TK_RESET},
// {"QUERY", TK_QUERY},
// {"ADD", TK_ADD},
// {"COLUMN", TK_COLUMN},
// {"TAG", TK_TAG},
......
......@@ -1812,6 +1812,10 @@ static int32_t translateExplain(STranslateContext* pCxt, SExplainStmt* pStmt) {
return translateQuery(pCxt, pStmt->pQuery);
}
static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt) {
return getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta);
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
......@@ -1896,6 +1900,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_EXPLAIN_STMT:
code = translateExplain(pCxt, (SExplainStmt*)pNode);
break;
case QUERY_NODE_DESCRIBE_STMT:
code = translateDescribe(pCxt, (SDescribeStmt*)pNode);
break;
default:
break;
}
......@@ -1913,42 +1920,83 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
return code;
}
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
if (NULL == pRoot) {
return TSDB_CODE_SUCCESS;
static int32_t extractSelectResultSchema(const SSelectStmt* pSelect, int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = LIST_LENGTH(pSelect->pProjectionList);
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (QUERY_NODE_SELECT_STMT == nodeType(pRoot)) {
SSelectStmt* pSelect = (SSelectStmt*) pRoot;
*numOfCols = LIST_LENGTH(pSelect->pProjectionList);
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pNode;
int32_t index = 0;
FOREACH(pNode, pSelect->pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
(*pSchema)[index].type = pExpr->resType.type;
(*pSchema)[index].bytes = pExpr->resType.bytes;
(*pSchema)[index].colId = index + 1;
strcpy((*pSchema)[index].name, pExpr->aliasName);
index +=1;
}
SNode* pNode;
int32_t index = 0;
FOREACH(pNode, pSelect->pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
(*pSchema)[index].type = pExpr->resType.type;
(*pSchema)[index].bytes = pExpr->resType.bytes;
(*pSchema)[index].colId = index + 1;
strcpy((*pSchema)[index].name, pExpr->aliasName);
index +=1;
}
} else if (QUERY_NODE_EXPLAIN_STMT == nodeType(pRoot)) {
*numOfCols = 1;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = TSDB_EXPLAIN_RESULT_ROW_SIZE;
return TSDB_CODE_SUCCESS;
}
static int32_t extractExplainResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = 1;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = TSDB_EXPLAIN_RESULT_ROW_SIZE;
return TSDB_CODE_SUCCESS;
}
static int32_t extractDescribeResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = DESCRIBE_RESULT_COLS;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = DESCRIBE_RESULT_FIELD_LEN;
strcpy((*pSchema)[0].name, "Field");
(*pSchema)[1].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[1].bytes = DESCRIBE_RESULT_TYPE_LEN;
strcpy((*pSchema)[1].name, "Type");
(*pSchema)[2].type = TSDB_DATA_TYPE_INT;
(*pSchema)[2].bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
strcpy((*pSchema)[2].name, "Length");
(*pSchema)[3].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[3].bytes = DESCRIBE_RESULT_NOTE_LEN;
strcpy((*pSchema)[3].name, "Note");
return TSDB_CODE_SUCCESS;
}
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
if (NULL == pRoot) {
return TSDB_CODE_SUCCESS;
}
switch (nodeType(pRoot)) {
case QUERY_NODE_SELECT_STMT:
return extractSelectResultSchema((SSelectStmt*)pRoot, numOfCols, pSchema);
case QUERY_NODE_EXPLAIN_STMT:
return extractExplainResultSchema(numOfCols, pSchema);
case QUERY_NODE_DESCRIBE_STMT:
return extractDescribeResultSchema(numOfCols, pSchema);
default:
break;
}
return TSDB_CODE_FAILED;
}
static void destroyTranslateContext(STranslateContext* pCxt) {
if (NULL != pCxt->pNsLevel) {
size_t size = taosArrayGetSize(pCxt->pNsLevel);
......@@ -2530,25 +2578,30 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SELECT_STMT:
case QUERY_NODE_EXPLAIN_STMT:
pQuery->haveResultSet = true;
pQuery->directRpc = false;
pQuery->msgType = TDMT_VND_QUERY;
if (TSDB_CODE_SUCCESS != qExtractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
break;
case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->haveResultSet = false;
pQuery->directRpc = false;
pQuery->msgType = TDMT_VND_CREATE_TABLE;
break;
case QUERY_NODE_DESCRIBE_STMT:
pQuery->localCmd = true;
pQuery->haveResultSet = true;
break;
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
pQuery->localCmd = true;
break;
default:
pQuery->haveResultSet = false;
pQuery->directRpc = true;
pQuery->pCmdMsg = pCxt->pCmdMsg;
pCxt->pCmdMsg = NULL;
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg, SCmdMsgInfo*);
pQuery->msgType = pQuery->pCmdMsg->msgType;
break;
}
if (pQuery->haveResultSet) {
if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (NULL != pCxt->pDbs) {
pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN);
......
......@@ -62,3 +62,7 @@ void qDestroyQuery(SQuery* pQueryNode) {
taosArrayDestroy(pQueryNode->pTableList);
taosMemoryFreeClear(pQueryNode);
}
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
return extractResultSchema(pRoot, numOfCols, pSchema);
}
\ No newline at end of file
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -719,6 +719,9 @@ static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExc
if (NULL == pScan->pScanCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
code = sortScanCols(pScan->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册