未验证 提交 a5459df0 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #10698 from taosdata/feature/3.0_query_integrate_wxy

TD-13981 show databases rewrite
......@@ -35,7 +35,8 @@ typedef enum {
TSDB_NORMAL_TABLE = 3, // ordinary table
TSDB_STREAM_TABLE = 4, // table created from stream computing
TSDB_TEMP_TABLE = 5, // temp table created by nest query
TSDB_TABLE_MAX = 6
TSDB_SYSTEM_TABLE = 6,
TSDB_TABLE_MAX = 7
} ETableType;
typedef enum {
......
......@@ -36,7 +36,7 @@ typedef struct SLogicNode {
typedef enum EScanType {
SCAN_TYPE_TAG,
SCAN_TYPE_TABLE,
SCAN_TYPE_STABLE,
SCAN_TYPE_SYSTEM_TABLE,
SCAN_TYPE_STREAM
} EScanType;
......
......@@ -99,7 +99,7 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_INS_TABLE_MNODES "mnodes"
#define TSDB_INS_TABLE_MODULES "modules"
#define TSDB_INS_TABLE_QNODES "qnodes"
#define TSDB_INS_TABLE_USER_DATABASE "user_database"
#define TSDB_INS_TABLE_USER_DATABASES "user_databases"
#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions"
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
......
......@@ -128,7 +128,7 @@ static const SInfosTableMeta infosMeta[] = {{TSDB_INS_TABLE_DNODES, dnodesSchema
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
{TSDB_INS_TABLE_USER_DATABASE, userDBSchema, tListLen(userDBSchema)},
{TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)},
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
......@@ -165,7 +165,7 @@ int32_t mndInsInitMeta(SHashObj *hash) {
STableMetaRsp meta = {0};
strcpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB);
meta.tableType = TSDB_NORMAL_TABLE;
meta.tableType = TSDB_SYSTEM_TABLE;
meta.sversion = 1;
meta.tversion = 1;
......
......@@ -107,6 +107,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiTableSeqScan";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "PhysiSreamScan";
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
return "PhysiSystemTableScan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
......@@ -440,6 +442,14 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
return code;
}
static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) {
return physiScanNodeToJson(pObj, pJson);
}
static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
return jsonToPhysiScanNode(pJson, pObj);
}
static const char* jkProjectPhysiPlanProjections = "Projections";
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
......@@ -1492,6 +1502,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
break;
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
return physiSysTableScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return physiProjectNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
......@@ -1569,6 +1581,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiTagScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return jsonToPhysiTableScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
return jsonToPhysiSysTableScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return jsonToPhysiProjectNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
......
......@@ -146,6 +146,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(STableSeqScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return makeNode(type, sizeof(SNode));
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
return makeNode(type, sizeof(SSystemTableScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
......
......@@ -396,9 +396,9 @@ static bool checkPort(SAstCreateContext* pCxt, const SToken* pPortToken, int32_t
return pCxt->valid;
}
static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName) {
static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName, bool query) {
if (NULL == pDbName) {
return true;
return (query ? NULL != pCxt->pQueryCxt->db : true);
}
pCxt->valid = pDbName->n < TSDB_DB_NAME_LEN ? true : false;
return pCxt->valid;
......@@ -557,7 +557,7 @@ SNode* createNodeListNode(SAstCreateContext* pCxt, SNodeList* pList) {
}
SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const SToken* pTableName, const SToken* pTableAlias) {
if (!checkDbName(pCxt, pDbName) || !checkTableName(pCxt, pTableName)) {
if (!checkDbName(pCxt, pDbName, true) || !checkTableName(pCxt, pTableName)) {
return NULL;
}
SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
......@@ -769,7 +769,7 @@ SDatabaseOptions* setDatabaseOption(SAstCreateContext* pCxt, SDatabaseOptions* p
}
SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SDatabaseOptions* pOptions) {
if (!checkDbName(pCxt, pDbName)) {
if (!checkDbName(pCxt, pDbName, false)) {
return NULL;
}
SCreateDatabaseStmt* pStmt = (SCreateDatabaseStmt*)nodesMakeNode(QUERY_NODE_CREATE_DATABASE_STMT);
......@@ -782,7 +782,7 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons
}
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pDbName) {
if (!checkDbName(pCxt, pDbName)) {
if (!checkDbName(pCxt, pDbName, false)) {
return NULL;
}
SDropDatabaseStmt* pStmt = (SDropDatabaseStmt*)nodesMakeNode(QUERY_NODE_DROP_DATABASE_STMT);
......@@ -904,7 +904,7 @@ SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
}
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDbName) {
if (!checkDbName(pCxt, pDbName)) {
if (!checkDbName(pCxt, pDbName, false)) {
return NULL;
}
SShowStmt* pStmt = nodesMakeNode(type);;
......
......@@ -499,25 +499,38 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
return TSDB_CODE_SUCCESS;
}
static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) {
static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) {
size_t vgroupNum = taosArrayGetSize(pVgs);
*pVgsInfo = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum);
if (NULL == *pVgsInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pVgsInfo)->numOfVgroups = vgroupNum;
for (int32_t i = 0; i < vgroupNum; ++i) {
SVgroupInfo *vg = taosArrayGet(pVgs, i);
(*pVgsInfo)->vgroups[i] = *vg;
}
return TSDB_CODE_SUCCESS;
}
static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
SArray* vgroupList = NULL;
int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, &vgroupList);
if (code != TSDB_CODE_SUCCESS) {
return code;
code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, &vgroupList);
if (TSDB_CODE_SUCCESS == code) {
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
}
size_t vgroupNum = taosArrayGetSize(vgroupList);
pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum);
if (NULL == pRealTable->pVgroupList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pRealTable->pVgroupList->numOfVgroups = vgroupNum;
for (int32_t i = 0; i < vgroupNum; ++i) {
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
pRealTable->pVgroupList->vgroups[i] = *vg;
taosArrayDestroy(vgroupList);
} else if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) {
SArray* vgroupList = NULL;
char fullDbName[TSDB_DB_FNAME_LEN];
// tNameGetFullDbName(pName, fullDbName);
snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->acctId, "test");
code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, false, &vgroupList);
if (TSDB_CODE_SUCCESS == code) {
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
}
taosArrayDestroy(vgroupList);
} else {
pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
......@@ -525,12 +538,9 @@ static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNo
return TSDB_CODE_OUT_OF_MEMORY;
}
pRealTable->pVgroupList->numOfVgroups = 1;
int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, pRealTable->pVgroupList->vgroups);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pRealTable->pVgroupList->vgroups);
}
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
......@@ -1238,6 +1248,25 @@ static void destroyTranslateContext(STranslateContext* pCxt) {
}
}
static int32_t rewriteShowDatabase(STranslateContext* pCxt, SQuery* pQuery) {
SSelectStmt* pStmt = nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == pStmt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
if (NULL == pTable) {
nodesDestroyNode(pStmt);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB);
strcpy(pTable->table.tableName, TSDB_INS_TABLE_USER_DATABASES);
pStmt->pFromTable = (SNode*)pTable;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
return TSDB_CODE_SUCCESS;
}
typedef struct SVgroupTablesBatch {
SVCreateTbBatchReq req;
SVgroupInfo info;
......@@ -1608,6 +1637,9 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pQuery->pRoot)) {
case QUERY_NODE_SHOW_DATABASES_STMT:
code = rewriteShowDatabase(pCxt, pQuery);
break;
case QUERY_NODE_CREATE_TABLE_STMT:
if (NULL == ((SCreateTableStmt*)pQuery->pRoot)->pTags) {
code = rewriteCreateTable(pCxt, pQuery);
......
......@@ -123,6 +123,26 @@ error:
return pRoot;
}
static EScanType getScanType(SNodeList* pScanCols, STableMeta* pMeta) {
if (NULL == pScanCols) {
// select count(*) from t
return SCAN_TYPE_TABLE;
}
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
return SCAN_TYPE_SYSTEM_TABLE;
}
SNode* pCol = NULL;
FOREACH(pCol, pScanCols) {
if (COLUMN_TYPE_COLUMN == ((SColumnNode*)pCol)->colType) {
return SCAN_TYPE_TABLE;
}
}
return SCAN_TYPE_TAG;
}
static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
CHECK_ALLOC(pScan, NULL);
......@@ -145,7 +165,7 @@ static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
}
pScan->scanType = SCAN_TYPE_TABLE;
pScan->scanType = getScanType(pCols, pScan->pMeta);
pScan->scanFlag = MAIN_SCAN;
pScan->scanRange = TSWINDOW_INITIALIZER;
pScan->tableName.type = TSDB_TABLE_NAME_T;
......
......@@ -259,13 +259,26 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p
return (SPhysiNode*)pTableScan;
}
static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
CHECK_ALLOC(pScan, NULL);
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan), (SPhysiNode*)pScan);
for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) {
SQueryNodeAddr addr;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr);
taosArrayPush(pCxt->pExecNodeList, &addr);
}
return (SPhysiNode*)pScan;
}
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
switch (pScanLogicNode->scanType) {
case SCAN_TYPE_TAG:
return createTagScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_TABLE:
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
case SCAN_TYPE_STABLE:
case SCAN_TYPE_SYSTEM_TABLE:
return createSystemTableScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_STREAM:
break;
default:
......@@ -768,7 +781,7 @@ static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
static int32_t doBuildPhysiPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) {
SSubplan* pSubplan = createPhysiSubplan(pCxt, pLogicSubplan);
CHECK_ALLOC(pSubplan, DEAL_RES_ERROR);
CHECK_ALLOC(pSubplan, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE_EXT(pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans));
++(pQueryPlan->numOfSubplans);
if (NULL != pParent) {
......
......@@ -166,7 +166,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) {
}
if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE &&
pMetaMsg->tableType != TSDB_NORMAL_TABLE) {
pMetaMsg->tableType != TSDB_NORMAL_TABLE && pMetaMsg->tableType != TSDB_SYSTEM_TABLE) {
qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
return TSDB_CODE_TSC_INVALID_VALUE;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册