提交 92cd252b 编写于 作者: X Xiaoyu Wang

TD-13706 integration create table stmt and insert stmt

上级 2beeacf1
......@@ -85,54 +85,55 @@
#define TK_DECIMAL 67
#define TK_SHOW 68
#define TK_DATABASES 69
#define TK_NK_FLOAT 70
#define TK_NK_BOOL 71
#define TK_NK_VARIABLE 72
#define TK_BETWEEN 73
#define TK_IS 74
#define TK_NULL 75
#define TK_NK_LT 76
#define TK_NK_GT 77
#define TK_NK_LE 78
#define TK_NK_GE 79
#define TK_NK_NE 80
#define TK_NK_EQ 81
#define TK_LIKE 82
#define TK_MATCH 83
#define TK_NMATCH 84
#define TK_IN 85
#define TK_FROM 86
#define TK_AS 87
#define TK_JOIN 88
#define TK_ON 89
#define TK_INNER 90
#define TK_SELECT 91
#define TK_DISTINCT 92
#define TK_WHERE 93
#define TK_PARTITION 94
#define TK_BY 95
#define TK_SESSION 96
#define TK_STATE_WINDOW 97
#define TK_INTERVAL 98
#define TK_SLIDING 99
#define TK_FILL 100
#define TK_VALUE 101
#define TK_NONE 102
#define TK_PREV 103
#define TK_LINEAR 104
#define TK_NEXT 105
#define TK_GROUP 106
#define TK_HAVING 107
#define TK_ORDER 108
#define TK_SLIMIT 109
#define TK_SOFFSET 110
#define TK_LIMIT 111
#define TK_OFFSET 112
#define TK_ASC 113
#define TK_DESC 114
#define TK_NULLS 115
#define TK_FIRST 116
#define TK_LAST 117
#define TK_TABLES 70
#define TK_NK_FLOAT 71
#define TK_NK_BOOL 72
#define TK_NK_VARIABLE 73
#define TK_BETWEEN 74
#define TK_IS 75
#define TK_NULL 76
#define TK_NK_LT 77
#define TK_NK_GT 78
#define TK_NK_LE 79
#define TK_NK_GE 80
#define TK_NK_NE 81
#define TK_NK_EQ 82
#define TK_LIKE 83
#define TK_MATCH 84
#define TK_NMATCH 85
#define TK_IN 86
#define TK_FROM 87
#define TK_AS 88
#define TK_JOIN 89
#define TK_ON 90
#define TK_INNER 91
#define TK_SELECT 92
#define TK_DISTINCT 93
#define TK_WHERE 94
#define TK_PARTITION 95
#define TK_BY 96
#define TK_SESSION 97
#define TK_STATE_WINDOW 98
#define TK_INTERVAL 99
#define TK_SLIDING 100
#define TK_FILL 101
#define TK_VALUE 102
#define TK_NONE 103
#define TK_PREV 104
#define TK_LINEAR 105
#define TK_NEXT 106
#define TK_GROUP 107
#define TK_HAVING 108
#define TK_ORDER 109
#define TK_SLIMIT 110
#define TK_SOFFSET 111
#define TK_LIMIT 112
#define TK_OFFSET 113
#define TK_ASC 114
#define TK_DESC 115
#define TK_NULLS 116
#define TK_FIRST 117
#define TK_LAST 118
#define TK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -74,7 +74,8 @@ typedef enum ENodeType {
QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_USE_DATABASE_STMT,
QUERY_NODE_SHOW_DATABASE_STMT, // temp
QUERY_NODE_SHOW_DATABASES_STMT, // temp
QUERY_NODE_SHOW_TABLES_STMT, // temp
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
......@@ -128,6 +129,7 @@ void nodesDestroyNode(SNodeptr pNode);
SNodeList* nodesMakeList();
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode);
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode);
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index);
......
......@@ -43,6 +43,7 @@ typedef struct SScanLogicNode {
SLogicNode node;
SNodeList* pScanCols;
struct STableMeta* pMeta;
SVgroupsInfo* pVgroupList;
EScanType scanType;
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange;
......@@ -84,7 +85,6 @@ typedef struct SSubLogicPlan {
SNodeList* pChildren;
SNodeList* pParents;
SLogicNode* pNode;
SQueryNodeAddr execNode;
ESubplanType subplanType;
int32_t level;
} SSubLogicPlan;
......
......@@ -123,6 +123,7 @@ struct STableMeta;
typedef struct SRealTableNode {
STableNode table; // QUERY_NODE_REAL_TABLE
struct STableMeta* pMeta;
SVgroupsInfo* pVgroupList;
} SRealTableNode;
typedef struct STempTableNode {
......
......@@ -38,8 +38,9 @@ typedef struct SParseContext {
typedef struct SCmdMsgInfo {
int16_t msgType;
SEpSet epSet;
char* pMsg;
void* pMsg;
int32_t msgLen;
void* pExtension; // todo remove it soon
} SCmdMsgInfo;
typedef struct SQuery {
......@@ -50,6 +51,7 @@ typedef struct SQuery {
int32_t numOfResCols;
SSchema* pResSchema;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -28,7 +28,7 @@ typedef struct SPlanContext {
} SPlanContext;
// Create the physical plan for the query, according to the AST.
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan);
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
......
......@@ -176,6 +176,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
if (pShowReqInfo->pArray == NULL) {
pShowReqInfo->currentIndex = 0; // set the first vnode/ then iterate the next vnode
pShowReqInfo->pArray = pMsgInfo->pExtension;
}
}
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
......@@ -183,10 +191,10 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
return TSDB_CODE_SUCCESS;
}
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pDag, SArray* pNodeList) {
pRequest->type = pQuery->sqlNodeType;
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
pRequest->type = pQuery->msgType;
SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot };
int32_t code = qCreateQueryPlan(&cxt, pDag);
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
if (code != 0) {
return code;
}
......@@ -219,7 +227,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code;
}
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.queryJob != 0) {
......
......@@ -160,7 +160,6 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
COPY_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(execNode);
COPY_SCALAR_FIELD(subplanType);
return (SNode*)pDst;
}
......
......@@ -80,8 +80,10 @@ const char* nodesNodeName(ENodeType type) {
return "CreateTableStmt";
case QUERY_NODE_USE_DATABASE_STMT:
return "UseDatabaseStmt";
case QUERY_NODE_SHOW_DATABASE_STMT:
case QUERY_NODE_SHOW_DATABASES_STMT:
return "ShowDatabaseStmt";
case QUERY_NODE_SHOW_TABLES_STMT:
return "ShowTablesStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -1322,7 +1324,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_CREATE_TABLE_STMT:
case QUERY_NODE_USE_DATABASE_STMT:
case QUERY_NODE_SHOW_DATABASE_STMT:
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
return logicScanNodeToJson(pObj, pJson);
......
......@@ -86,7 +86,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SCreateTableStmt));
case QUERY_NODE_USE_DATABASE_STMT:
return makeNode(type, sizeof(SUseDatabaseStmt));
case QUERY_NODE_SHOW_DATABASE_STMT:
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
return makeNode(type, sizeof(SNode));;
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
......@@ -202,6 +203,17 @@ int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) {
return TSDB_CODE_SUCCESS;
}
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) {
if (NULL == pNode) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = nodesListAppend(pList, pNode);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pNode);
}
return code;
}
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
if (NULL == pTarget || NULL == pSrc) {
return TSDB_CODE_SUCCESS;
......
......@@ -101,8 +101,8 @@ cmd ::= CREATE TABLE exists_opt(A) full_table_name(B)
%type full_table_name { STokenPair }
%destructor full_table_name { }
full_table_name(A) ::= NK_ID(B). { STokenPair t = { .first = B, .second = nil_token}; A = t; }
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { STokenPair t = { .first = B, .second = C}; A = t; }
full_table_name(A) ::= NK_ID(B). { STokenPair t = { .first = nil_token, .second = B }; A = t; }
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { STokenPair t = { .first = B, .second = C }; A = t; }
%type column_def_list { SNodeList* }
%destructor column_def_list { nodesDestroyList($$); }
......@@ -146,7 +146,8 @@ table_options(A) ::= table_options(B) KEEP NK_INTEGER(C).
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
/************************************************ show ***************************************************************/
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASE_STMT); }
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASES_STMT); }
cmd ::= SHOW TABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TABLES_STMT); }
/************************************************ select *************************************************************/
cmd ::= query_expression(A). { PARSER_TRACE; pCxt->pRootNode = A; }
......
......@@ -714,18 +714,18 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDat
strncpy(pCol->colName, pColName->z, pColName->n);
pCol->dataType = dataType;
if (NULL != pComment) {
strncpy(pCol->colName, pColName->z, pColName->n);
strncpy(pCol->comments, pComment->z, pComment->n);
}
return (SNode*)pCol;
}
SDataType createDataType(uint8_t type) {
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = 0 };
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = tDataTypes[type].bytes };
return dt;
}
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = 0 };
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = tDataTypes[type].bytes };
return dt;
}
......@@ -751,7 +751,7 @@ SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
}
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type) {
SNode* pStmt = nodesMakeNode(QUERY_NODE_SHOW_DATABASE_STMT);;
SNode* pStmt = nodesMakeNode(type);;
CHECK_OUT_OF_MEM(pStmt);
return pStmt;
}
......@@ -39,6 +39,7 @@ static void setQuery(SAstCreateContext* pCxt, SQuery* pQuery) {
pQuery->haveResultSet = false;
pQuery->directRpc = true;
}
pQuery->msgType = (QUERY_NODE_CREATE_TABLE_STMT == type ? TDMT_VND_CREATE_TABLE : TDMT_VND_QUERY);
}
int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
......
......@@ -561,6 +561,29 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
return TSDB_CODE_SUCCESS;
}
static int32_t setTableVgroupList(STranslateContext *pCxt, SName* name, SVgroupsInfo **pVgList) {
SArray* vgroupList = NULL;
int32_t code = catalogGetTableDistVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), name, &vgroupList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
size_t vgroupNum = taosArrayGetSize(vgroupList);
SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum);
vgList->numOfVgroups = vgroupNum;
for (int32_t i = 0; i < vgroupNum; ++i) {
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
vgList->vgroups[i] = *vg;
}
*pVgList = vgList;
taosArrayDestroy(vgroupList);
return TSDB_CODE_SUCCESS;
}
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pTable)) {
......@@ -572,6 +595,10 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
}
code = setTableVgroupList(pCxt, &name, &(pRealTable->pVgroupList));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = addNamespace(pCxt, pRealTable);
break;
}
......@@ -852,11 +879,7 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateShow(STranslateContext* pCxt) {
static int32_t translateShowDatabases(STranslateContext* pCxt) {
SShowReq showReq = { .type = TSDB_MGMT_TABLE_DB };
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
......@@ -875,6 +898,34 @@ static int32_t translateShow(STranslateContext* pCxt) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateShowTables(STranslateContext* pCxt) {
SName name = {0};
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
SArray* array = NULL;
int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, false, &array);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SVgroupInfo* info = taosArrayGet(array, 0);
pShowReq->head.vgId = htonl(info->vgId);
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
if (NULL== pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCxt->pCmdMsg->epSet = info->epset;
pCxt->pCmdMsg->msgType = TDMT_VND_SHOW_TABLES;
pCxt->pCmdMsg->msgLen = sizeof(SVShowTablesReq);
pCxt->pCmdMsg->pMsg = pShowReq;
pCxt->pCmdMsg->pExtension = array;
return TSDB_CODE_SUCCESS;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
......@@ -887,10 +938,11 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_USE_DATABASE_STMT:
code = translateUseDatabase(pCxt, (SUseDatabaseStmt*)pNode);
break;
case QUERY_NODE_SHOW_DATABASE_STMT:
code = translateShow(pCxt);
case QUERY_NODE_CREATE_TABLE_STMT:
code = translateCreateTable(pCxt, (SCreateTableStmt*)pNode);
case QUERY_NODE_SHOW_DATABASES_STMT:
code = translateShowDatabases(pCxt);
break;
case QUERY_NODE_SHOW_TABLES_STMT:
code = translateShowTables(pCxt);
break;
default:
break;
......@@ -942,9 +994,10 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info;
} SVgroupTablesBatch;
static void toSchema(const SColumnNode* pCol, SSchema* pSchema) {
pSchema->type = pCol->node.resType.type;
pSchema->bytes = pCol->node.resType.bytes;
static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema) {
pSchema->colId = colId;
pSchema->type = pCol->dataType.type;
pSchema->bytes = pCol->dataType.bytes;
strcpy(pSchema->name, pCol->colName);
}
......@@ -960,7 +1013,8 @@ static int32_t doBuildSingleTableBatchReq(SName* pTableName, SNodeList* pColumns
SNode* pCol;
int32_t index = 0;
FOREACH(pCol, pColumns) {
toSchema((SColumnNode*)pCol, req.ntbCfg.pSchema + index++);
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
++index;
}
pBatch->info = *pVgroupInfo;
......@@ -1018,7 +1072,11 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(tableName.dbname, pStmt->dbName);
if ('\0' == pStmt->dbName[0]) {
strcpy(tableName.dbname, pCxt->pParseCxt->db);
} else {
strcpy(tableName.dbname, pStmt->dbName);
}
strcpy(tableName.tname, pStmt->tableName);
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &tableName, &info);
......
......@@ -159,12 +159,9 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
SName name = {0};
createSName(&name, pTname, pCxt->pComCxt, &pCxt->msg);
char tableName[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&name, tableName);
SParseContext* pBasicCtx = pCxt->pComCxt;
SName name = {0};
createSName(&name, pTname, pBasicCtx, &pCxt->msg);
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
SVgroupInfo vg;
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
......@@ -939,6 +936,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
*pQuery = calloc(1, sizeof(SQuery));
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->directRpc = false;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
......
此差异已折叠。
......@@ -88,7 +88,7 @@ static SKeyword keywordTable[] = {
// {"SCORES", TK_SCORES},
// {"GRANTS", TK_GRANTS},
// {"DOT", TK_DOT},
// {"TABLES", TK_TABLES},
{"TABLES", TK_TABLES},
// {"STABLES", TK_STABLES},
{"VGROUPS", TK_VGROUPS},
// {"DROP", TK_DROP},
......
......@@ -50,7 +50,7 @@ extern "C" {
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan, SArray* pExecNodeList);
#ifdef __cplusplus
}
......
......@@ -128,6 +128,7 @@ static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
pScan->node.id = pCxt->planNodeId++;
pScan->pMeta = pRealTable->pMeta;
pScan->pVgroupList = pRealTable->pVgroupList;
// set columns to scan
SNodeList* pCols = NULL;
......
......@@ -27,6 +27,7 @@ typedef struct SPhysiPlanContext {
int32_t errCode;
int16_t nextDataBlockId;
SArray* pLocationHelper;
SArray* pExecNodeList;
} SPhysiPlanContext;
static int32_t getSlotKey(SNode* pNode, char* pKey) {
......@@ -185,11 +186,41 @@ static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SData
return TSDB_CODE_SUCCESS;
}
static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_ALLOC(pCol, NULL);
pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
pCol->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
pCol->tableId = tableId;
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pCol->colType = COLUMN_TYPE_COLUMN;
return pCol;
}
static int32_t addPrimaryKeyCol(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode) {
if (NULL == pScanPhysiNode->pScanCols) {
pScanPhysiNode->pScanCols = nodesMakeList();
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid)));
return TSDB_CODE_SUCCESS;
}
SNode* pNode;
FOREACH(pNode, pScanPhysiNode->pScanCols) {
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) {
return TSDB_CODE_SUCCESS;
}
}
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid)));
return TSDB_CODE_SUCCESS;
}
static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) {
if (NULL != pScanLogicNode->pScanCols) {
pScanPhysiNode->pScanCols = nodesCloneList(pScanLogicNode->pScanCols);
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
}
CHECK_CODE(addPrimaryKeyCol(pCxt, pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY);
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
......@@ -206,6 +237,11 @@ static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanL
return TSDB_CODE_SUCCESS;
}
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
pNodeAddr->nodeId = vg->vgId;
pNodeAddr->epset = vg->epset;
}
static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
CHECK_ALLOC(pTagScan, NULL);
......@@ -213,21 +249,23 @@ static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNod
return (SPhysiNode*)pTagScan;
}
static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
CHECK_ALLOC(pTableScan, NULL);
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
pTableScan->scanFlag = pScanLogicNode->scanFlag;
pTableScan->scanRange = pScanLogicNode->scanRange;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
return (SPhysiNode*)pTableScan;
}
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
switch (pScanLogicNode->scanType) {
case SCAN_TYPE_TAG:
return createTagScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_TABLE:
return createTableScanPhysiNode(pCxt, pScanLogicNode);
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
case SCAN_TYPE_STABLE:
case SCAN_TYPE_STREAM:
break;
......@@ -428,13 +466,13 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
return (SPhysiNode*)pProject;
}
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPlan) {
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) {
SNodeList* pChildren = nodesMakeList();
CHECK_ALLOC(pChildren, NULL);
SNode* pLogicChild;
FOREACH(pLogicChild, pLogicPlan->pChildren) {
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, (SLogicNode*)pLogicChild);
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild);
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
nodesDestroyList(pChildren);
......@@ -445,7 +483,7 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
SPhysiNode* pPhyNode = NULL;
switch (nodeType(pLogicPlan)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
pPhyNode = createScanPhysiNode(pCxt, (SScanLogicNode*)pLogicPlan);
pPhyNode = createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
pPhyNode = createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicPlan);
......@@ -493,25 +531,17 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
pSubplan->pDataSink = createDataInserter(pCxt, pModif->pVgDataBlocks);
pSubplan->msgType = pModif->msgType;
pSubplan->execNode.epset = pModif->pVgDataBlocks->vg.epset;
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
} else {
pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode);
pSubplan->pNode = createPhysiNode(pCxt, pSubplan, pLogicSubplan->pNode);
pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
pSubplan->msgType = TDMT_VND_QUERY;
}
pSubplan->subplanType = pLogicSubplan->subplanType;
return pSubplan;
}
static int32_t strictListAppend(SNodeList* pList, SNodeptr pNode) {
if (NULL == pNode) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = nodesListAppend(pList, pNode);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pNode);
}
return code;
}
static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubLogicPlan** pSubLogicPlan) {
*pSubLogicPlan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
CHECK_ALLOC(*pSubLogicPlan, TSDB_CODE_OUT_OF_MEMORY);
......@@ -529,7 +559,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
if (level >= LIST_LENGTH(pSubplans)) {
pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
CHECK_ALLOC(pGroup, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(strictListAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(nodesListStrictAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY);
} else {
pGroup = nodesListGetNode(pSubplans, level);
}
......@@ -537,7 +567,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
pGroup->pNodeList = nodesMakeList();
CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY);
}
CHECK_CODE(strictListAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(nodesListStrictAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY);
}
SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) {
......@@ -562,7 +592,6 @@ static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int3
SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
pNewSubplan->execNode.epset = blocks->vg.epset;
((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks;
CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
}
......@@ -639,12 +668,13 @@ static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPl
return TSDB_CODE_SUCCESS;
}
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan) {
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan, SArray* pExecNodeList) {
SPhysiPlanContext cxt = {
.pPlanCxt = pCxt,
.errCode = TSDB_CODE_SUCCESS,
.nextDataBlockId = 0,
.pLocationHelper = taosArrayInit(32, POINTER_BYTES)
.pLocationHelper = taosArrayInit(32, POINTER_BYTES),
.pExecNodeList = pExecNodeList
};
if (NULL == cxt.pLocationHelper) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -21,14 +21,14 @@ int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return TSDB_CODE_SUCCESS;
}
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan) {
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
SLogicNode* pLogicNode = NULL;
int32_t code = createLogicPlan(pCxt, &pLogicNode);
if (TSDB_CODE_SUCCESS == code) {
code = optimize(pCxt, pLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicNode, pPlan);
code = createPhysiPlan(pCxt, pLogicNode, pPlan, pExecNodeList);
}
return code;
}
......
......@@ -71,7 +71,7 @@ protected:
if (TEST_PHYSICAL_PLAN == target) {
SQueryPlan* pPlan = nullptr;
code = createPhysiPlan(&cxt, pLogicPlan, &pPlan);
code = createPhysiPlan(&cxt, pLogicPlan, &pPlan, NULL);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册