diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h new file mode 100644 index 0000000000000000000000000000000000000000..01104ab5d8086c278e795892048df54a2451d77b --- /dev/null +++ b/include/libs/parser/parsenodes.h @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_PARSENODES_H_ +#define _TD_PARSENODES_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "catalog.h" +#include "common.h" +#include "function.h" +#include "tmsgtype.h" +#include "tname.h" +#include "tvariant.h" + +/* + * The first field of a node of any type is guaranteed to be the int16_t. + * Hence the type of any node can be gotten by casting it to SQueryNode. + */ +typedef struct SQueryNode { + int16_t type; +} SQueryNode; + +#define nodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type) + +typedef struct SField { + char name[TSDB_COL_NAME_LEN]; + uint8_t type; + int16_t bytes; +} SField; + +typedef struct SFieldInfo { + int16_t numOfOutput; // number of column in result + SField *final; + SArray *internalField; // SArray +} SFieldInfo; + +typedef struct SCond { + uint64_t uid; + int32_t len; // length of tag query condition data + char * cond; +} SCond; + +typedef struct SJoinNode { + uint64_t uid; + int16_t tagColId; + SArray* tsJoin; + SArray* tagJoin; +} SJoinNode; + +typedef struct SJoinInfo { + bool hasJoin; + SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM]; +} SJoinInfo; + +typedef struct STagCond { + int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR + SCond tbnameCond; // tbname query condition, only support tbname query condition on one table + SJoinInfo joinInfo; // join condition, only support two tables join currently + SArray *pCond; // for different table, the query condition must be seperated +} STagCond; + +typedef struct STableMetaInfo { + STableMeta *pTableMeta; // table meta, cached in client side and acquired by name + SVgroupsInfo *vgroupList; + SName name; + char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql + SArray *tagColList; // SArray, involved tag columns +} STableMetaInfo; + +typedef struct SColumnIndex { + int16_t tableIndex; + int16_t columnIndex; + int16_t type; // normal column/tag/ user input constant column +} SColumnIndex; + +// select statement +typedef struct SQueryStmtInfo { + int16_t command; // the command may be different for each subclause, so keep it seperately. + uint32_t type; // query/insert type + STimeWindow window; // the whole query time window + SInterval interval; // tumble time window + SSessionWindow sessionWindow; // session time window + SStateWindow stateWindow; // state window query + SGroupbyExpr groupbyExpr; // groupby tags info + SArray * colList; // SArray + SFieldInfo fieldsInfo; + SArray** exprList; // SArray + SLimit limit; + SLimit slimit; + STagCond tagCond; + SArray * colCond; + SArray * order; + int16_t numOfTables; + int16_t curTableIdx; + STableMetaInfo **pTableMetaInfo; + struct STSBuf *tsBuf; + + int16_t fillType; // final result fill type + int64_t * fillVal; // default value for fill + int32_t numOfFillVal; // fill value size + + char * msg; // pointer to the pCmd->payload to keep error message temporarily + int64_t clauseLimit; // limit for current sub clause + + int64_t prjOffset; // offset value in the original sql expression, only applied at client side + int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit + + int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX + int32_t bufLen; + char* buf; + SArray *pUdfInfo; + + struct SQueryStmtInfo *sibling; // sibling + struct SQueryStmtInfo *pDownstream; + SMultiFunctionsDesc info; + SArray *pUpstream; // SArray + int32_t havingFieldNum; + int32_t exprListLevelIndex; +} SQueryStmtInfo; + +typedef enum { + PAYLOAD_TYPE_KV = 0, + PAYLOAD_TYPE_RAW = 1, +} EPayloadType; + +typedef struct SVgDataBlocks { + SVgroupInfo vg; + int32_t numOfTables; // number of tables in current submit block + uint32_t size; + char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ... +} SVgDataBlocks; + +typedef struct SInsertStmtInfo { + int16_t nodeType; + SArray* pDataBlocks; // data block for each vgroup, SArray. + int8_t schemaAttache; // denote if submit block is built with table schema or not + uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert + uint32_t insertType; // insert data from [file|sql statement| bound statement] + const char* sql; // current sql statement position +} SInsertStmtInfo; + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_PARSENODES_H_*/ diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index df2170423f899b5ba1a8f2d9472e1fea4ccb72a3..737d5c258c5d201ad81399a3129c6956f6bd9a4b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -20,108 +20,7 @@ extern "C" { #endif -#include "catalog.h" -#include "common.h" -#include "tname.h" -#include "tvariant.h" -#include "function.h" - -typedef struct SField { - char name[TSDB_COL_NAME_LEN]; - uint8_t type; - int16_t bytes; -} SField; - -typedef struct SFieldInfo { - int16_t numOfOutput; // number of column in result - SField *final; - SArray *internalField; // SArray -} SFieldInfo; - -typedef struct SCond { - uint64_t uid; - int32_t len; // length of tag query condition data - char * cond; -} SCond; - -typedef struct SJoinNode { - uint64_t uid; - int16_t tagColId; - SArray* tsJoin; - SArray* tagJoin; -} SJoinNode; - -typedef struct SJoinInfo { - bool hasJoin; - SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM]; -} SJoinInfo; - -typedef struct STagCond { - int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR - SCond tbnameCond; // tbname query condition, only support tbname query condition on one table - SJoinInfo joinInfo; // join condition, only support two tables join currently - SArray *pCond; // for different table, the query condition must be seperated -} STagCond; - -typedef struct STableMetaInfo { - STableMeta *pTableMeta; // table meta, cached in client side and acquired by name - SVgroupsInfo *vgroupList; - SName name; - char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql - SArray *tagColList; // SArray, involved tag columns -} STableMetaInfo; - -typedef struct SQueryStmtInfo { - int16_t command; // the command may be different for each subclause, so keep it seperately. - uint32_t type; // query/insert type - STimeWindow window; // the whole query time window - SInterval interval; // tumble time window - SSessionWindow sessionWindow; // session time window - SStateWindow stateWindow; // state window query - SGroupbyExpr groupbyExpr; // groupby tags info - SArray * colList; // SArray - SFieldInfo fieldsInfo; - SArray** exprList; // SArray - SLimit limit; - SLimit slimit; - STagCond tagCond; - SArray * colCond; - SArray * order; - int16_t numOfTables; - int16_t curTableIdx; - STableMetaInfo **pTableMetaInfo; - struct STSBuf *tsBuf; - - int16_t fillType; // final result fill type - int64_t * fillVal; // default value for fill - int32_t numOfFillVal; // fill value size - - char * msg; // pointer to the pCmd->payload to keep error message temporarily - int64_t clauseLimit; // limit for current sub clause - - int64_t prjOffset; // offset value in the original sql expression, only applied at client side - int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit - - int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX - int32_t bufLen; - char* buf; - SArray *pUdfInfo; - - struct SQueryStmtInfo *sibling; // sibling - struct SQueryStmtInfo *pDownstream; - SMultiFunctionsDesc info; - SArray *pUpstream; // SArray - int32_t havingFieldNum; - int32_t exprListLevelIndex; -} SQueryStmtInfo; - -typedef struct SColumnIndex { - int16_t tableIndex; - int16_t columnIndex; - int16_t type; // normal column/tag/ user input constant column -} SColumnIndex; - -struct SInsertStmtInfo; +#include "parsenodes.h" /** * True will be returned if the input sql string is insert, false otherwise. @@ -156,26 +55,6 @@ typedef struct SParseContext { */ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t* type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen); -typedef enum { - PAYLOAD_TYPE_KV = 0, - PAYLOAD_TYPE_RAW = 1, -} EPayloadType; - -typedef struct SVgDataBlocks { - int64_t vgId; // virtual group id - int32_t numOfTables; // number of tables in current submit block - uint32_t size; - char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ... -} SVgDataBlocks; - -typedef struct SInsertStmtInfo { - SArray* pDataBlocks; // data block for each vgroup, SArray. - int8_t schemaAttache; // denote if submit block is built with table schema or not - uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert - uint32_t insertType; // insert data from [file|sql statement| bound statement] - const char* sql; // current sql statement position -} SInsertStmtInfo; - /** * Parse the insert sql statement. * @param pStr sql string @@ -211,9 +90,9 @@ typedef struct SSourceParam { SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, const char* funcName, SSourceParam* pSource, SSchema* pResSchema, int16_t interSize); int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); int32_t copyAllExprInfo(SArray* dst, const SArray* src, bool deepcopy); -int32_t getExprFunctionLevel(SQueryStmtInfo* pQueryInfo); +int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo); -STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex); +STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex); SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex); SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name); diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 8b54b88b28fd908bf2ad9040918cdcf9a4c4eea0..153cf0bb3ec2e4ad33f1e04ebff53a709e3e555c 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -25,6 +25,7 @@ extern "C" { #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 #define QUERY_TYPE_SCAN 3 +#define QUERY_TYPE_MODIFY 4 enum OPERATOR_TYPE_E { OP_Unknown, @@ -58,18 +59,17 @@ typedef struct SQueryNodeBasicInfo { typedef struct SDataSink { SQueryNodeBasicInfo info; - SDataBlockSchema schema; } SDataSink; typedef struct SDataDispatcher { SDataSink sink; - // todo } SDataDispatcher; typedef struct SDataInserter { SDataSink sink; - uint64_t uid; // unique id of the table - // todo data field + int32_t numOfTables; + uint32_t size; + char *pData; } SDataInserter; typedef struct SPhyNode { @@ -119,12 +119,13 @@ typedef struct SSubplanId { typedef struct SSubplan { SSubplanId id; // unique id of the subplan - int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN - int32_t level; // the execution level of current subplan, starting from 0. - SEpSet execEpSet; // for the scan sub plan, the optional execution node - SArray *pChildern; // the datasource subplan,from which to fetch the result - SArray *pParents; // the data destination subplan, get data from current subplan - SPhyNode *pNode; // physical plan of current subplan + int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN + int32_t level; // the execution level of current subplan, starting from 0. + SEpSet execEpSet; // for the scan sub plan, the optional execution node + SArray *pChildern; // the datasource subplan,from which to fetch the result + SArray *pParents; // the data destination subplan, get data from current subplan + SPhyNode *pNode; // physical plan of current subplan + SDataSink *pDataSink; // data of the subplan flow into the datasink } SSubplan; typedef struct SQueryDag { @@ -133,10 +134,12 @@ typedef struct SQueryDag { SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryDag; +struct SQueryNode; + /** * Create the physical plan for the query, according to the AST. */ -int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); +int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule @@ -144,7 +147,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* // @ep one execution location of this group of datasource subplans int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); -int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); +int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); /** * Convert to subplan to string for the scheduler to send to the executor diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h index fbd92b89f85aa789f3c50360da372ec38b2bd812..69dad4d9f4b792ed573628f05ddb95f301f8cc08 100644 --- a/source/libs/parser/inc/dataBlockMgt.h +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -78,7 +78,7 @@ typedef struct { typedef struct STableDataBlocks { int8_t tsSource; // where does the UNIX timestamp come from, server or client bool ordered; // if current rows are ordered or not - int64_t vgId; // virtual group id + int32_t vgId; // virtual group id int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending int32_t numOfTables; // number of tables in current submit block int32_t rowSize; // row size for current table diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index d37ac74d218ea4df8f3dcbdf22693f5ed2a08c68..2f3d5aab77ecaea9bf30ab2113f6636bcee7e231 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -123,7 +123,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star dataBuf->nAllocSize = dataBuf->headerSize * 2; } - //dataBuf->pData = calloc(1, dataBuf->nAllocSize); dataBuf->pData = malloc(dataBuf->nAllocSize); if (dataBuf->pData == NULL) { tfree(dataBuf); @@ -145,8 +144,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star dataBuf->tsSource = -1; dataBuf->vgId = dataBuf->pTableMeta->vgId; - // tNameAssign(&dataBuf->tableName, name); - assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL); *dataBlocks = dataBuf; diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 2481896f092b650586c77ad8b6ebeede9127b4f7..9aba2e43ce81cef42fccf922c4c9aa8d8279e1d0 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -50,15 +50,16 @@ enum { }; typedef struct SInsertParseContext { - SParseContext* pComCxt; - const char* pSql; - SMsgBuf msg; - STableMeta* pTableMeta; - SParsedDataColInfo tags; - SKVRowBuilder tagsBuilder; - SHashObj* pTableBlockHashObj; - SArray* pTableDataBlocks; - SArray* pVgDataBlocks; + SParseContext* pComCxt; // input + const char* pSql; // input + SMsgBuf msg; // input + STableMeta* pTableMeta; // each table + SParsedDataColInfo tags; // each table + SKVRowBuilder tagsBuilder; // each table + SHashObj* pVgroupsHashObj; // global + SHashObj* pTableBlockHashObj; // global + SArray* pTableDataBlocks; // global + SArray* pVgDataBlocks; // global int32_t totalNum; SInsertStmtInfo* pOutput; } SInsertParseContext; @@ -173,10 +174,12 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { char tableName[TSDB_TABLE_NAME_LEN] = {0}; CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName)); CHECK_CODE(catalogGetTableMeta(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta)); + SVgroupInfo vg; + CHECK_CODE(catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &vg)); + CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); return TSDB_CODE_SUCCESS; } -// todo speedup by using hash list static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { while (start < end) { if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) { @@ -187,16 +190,16 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS return -1; } -static void fillMsgHeader(SVgDataBlocks* dst) { - SMsgDesc* desc = (SMsgDesc*)dst->pData; +static void buildMsgHeader(SVgDataBlocks* blocks) { + SMsgDesc* desc = (SMsgDesc*)blocks->pData; desc->numOfVnodes = htonl(1); SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); - submit->header.vgId = htonl(dst->vgId); - submit->header.contLen = htonl(dst->size - sizeof(SMsgDesc)); + submit->header.vgId = htonl(blocks->vg.vgId); + submit->header.contLen = htonl(blocks->size - sizeof(SMsgDesc)); submit->length = submit->header.contLen; - submit->numOfBlocks = htonl(dst->numOfTables); + submit->numOfBlocks = htonl(blocks->numOfTables); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); - int32_t numOfBlocks = dst->numOfTables; + int32_t numOfBlocks = blocks->numOfTables; while (numOfBlocks--) { int32_t dataLen = blk->dataLen; blk->uid = htobe64(blk->uid); @@ -222,11 +225,11 @@ static int32_t buildOutput(SInsertParseContext* pCxt) { if (NULL == dst) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - dst->vgId = src->vgId; + taosHashGetClone(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); dst->numOfTables = src->numOfTables; dst->size = src->size; SWAP(dst->pData, src->pData, char*); - fillMsgHeader(dst); + buildMsgHeader(dst); taosArrayPush(pCxt->pOutput->pDataBlocks, &dst); } return TSDB_CODE_SUCCESS; @@ -546,7 +549,7 @@ static FORCE_INLINE int32_t parseOneValue(SInsertParseContext* pCxt, SToken* pTo } case TSDB_DATA_TYPE_BINARY: { // too long values will return invalid sql, not be truncated automatically - if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor + if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z); } return func(pToken->z, pToken->n, param); @@ -644,9 +647,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m)); } - destroyBoundColumnInfo(&pCxt->tags); SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder); - tdDestroyKVRowBuilder(&pCxt->tagsBuilder); if (NULL == row) { return buildInvalidOperationMsg(&pCxt->msg, "tag value expected"); } @@ -799,13 +800,30 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da return TSDB_CODE_SUCCESS; } +static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { + tfree(pCxt->pTableMeta); + destroyBoundColumnInfo(&pCxt->tags); + tdDestroyKVRowBuilder(&pCxt->tagsBuilder); +} + +static void destroyInsertParseContext(SInsertParseContext* pCxt) { + destroyInsertParseContextForTable(pCxt); + taosHashCleanup(pCxt->pVgroupsHashObj); + taosHashCleanup(pCxt->pTableBlockHashObj); + destroyBlockArrayList(pCxt->pTableDataBlocks); + destroyBlockArrayList(pCxt->pVgDataBlocks); +} + // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] // [(field1_name, ...)] // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt) { + // for each table while (1) { + destroyInsertParseContextForTable(pCxt); + SToken sToken; // pSql -> tb_name ... NEXT_TOKEN(pCxt->pSql, sToken); @@ -867,15 +885,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return buildOutput(pCxt); } -static void destroyInsertParseContext(SInsertParseContext* pCxt) { - tfree(pCxt->pTableMeta); - destroyBoundColumnInfo(&pCxt->tags); - tdDestroyKVRowBuilder(&pCxt->tagsBuilder); - taosHashCleanup(pCxt->pTableBlockHashObj); - destroyBlockArrayList(pCxt->pTableDataBlocks); - destroyBlockArrayList(pCxt->pVgDataBlocks); -} - // INSERT INTO // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] @@ -888,12 +897,13 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { .pSql = pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .pTableMeta = NULL, + .pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false), .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .totalNum = 0, .pOutput = calloc(1, sizeof(SInsertStmtInfo)) }; - if (NULL == context.pTableBlockHashObj || NULL == context.pOutput) { + if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index b72bc06324d3a33e960b713c421e0af77a3dd86d..0239bf71ddac3732b533e6b6d6ae21d861fe0590 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1167,7 +1167,7 @@ void cleanupTagCond(STagCond* pTagCond) { * @param tableIndex denote the table index for join query, where more than one table exists * @return */ -STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex) { +STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex) { assert(pQueryInfo != NULL); if (pQueryInfo->pTableMetaInfo == NULL) { assert(pQueryInfo->numOfTables == 0); diff --git a/source/libs/parser/src/queryInfoUtil.c b/source/libs/parser/src/queryInfoUtil.c index 502946675b40738c8ec7d3e35287669ce04dab71..1ae0d9211a2925b27b415becfab1fc385d242965 100644 --- a/source/libs/parser/src/queryInfoUtil.c +++ b/source/libs/parser/src/queryInfoUtil.c @@ -354,7 +354,7 @@ bool tscHasColumnFilter(SQueryStmtInfo* pQueryInfo) { return false; } -int32_t getExprFunctionLevel(SQueryStmtInfo* pQueryInfo) { +int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo) { int32_t n = 10; int32_t level = 0; diff --git a/source/libs/parser/test/insertParserTest.cpp b/source/libs/parser/test/insertParserTest.cpp index 433d189a453053c84bb11672e4f9586d30c02566..29a2677d1bc1229e3fb3e6eb392608e0969b1106 100644 --- a/source/libs/parser/test/insertParserTest.cpp +++ b/source/libs/parser/test/insertParserTest.cpp @@ -69,7 +69,7 @@ protected: cout << "schemaAttache:" << (int32_t)res_->schemaAttache << ", payloadType:" << (int32_t)res_->payloadType << ", insertType:" << res_->insertType << ", numOfVgs:" << num << endl; for (size_t i = 0; i < num; ++i) { SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i); - cout << "vgId:" << vg->vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl; + cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl; SMsgDesc* desc = (SMsgDesc*)(vg->pData); cout << "numOfVnodes:" << ntohl(desc->numOfVnodes) << endl; SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index fdcb6e0433b89a84f05646690d506387765a9d9d..6d8973366803dc30f5cd29f5baed40f0563facf3 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -42,11 +42,15 @@ void generateTestST1(MockCatalogService* mcs) { } int32_t __catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { - return mockCatalogService->catalogGetHandle(clusterId, catalogHandle); + return 0; } int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { - return mockCatalogService->catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, pTableMeta); + return mockCatalogService->catalogGetTableMeta(pDBName, pTableName, pTableMeta); +} + +int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) { + return mockCatalogService->catalogGetTableHashVgroup(pDBName, pTableName, vgInfo); } void initMetaDataEnv() { @@ -55,6 +59,7 @@ void initMetaDataEnv() { static Stub stub; stub.set(catalogGetHandle, __catalogGetHandle); stub.set(catalogGetTableMeta, __catalogGetTableMeta); + stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup); { AddrAny any("libcatalog.so"); std::map result; @@ -71,6 +76,14 @@ void initMetaDataEnv() { stub.set(f.second, __catalogGetTableMeta); } } + { + AddrAny any("libcatalog.so"); + std::map result; + any.get_global_func_addr_dynsym("^catalogGetTableHashVgroup$", result); + for (const auto& f : result) { + stub.set(f.second, __catalogGetTableHashVgroup); + } + } } void generateMetaData() { diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index ad366e8f744ccb81bab33fc8f02cf426ee1eff4b..e234f82da93aad8612e57673fd89f64025d52685 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -90,11 +90,11 @@ public: MockCatalogServiceImpl() : id_(1) { } - int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) const { + int32_t catalogGetHandle() const { return 0; } - int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const { + int32_t catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const { std::unique_ptr table; int32_t code = copyTableSchemaMeta(pDBName, pTableName, &table); if (TSDB_CODE_SUCCESS != code) { @@ -104,6 +104,11 @@ public: return TSDB_CODE_SUCCESS; } + int32_t catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const { + // todo + return 0; + } + TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) { builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags); meta_[db][tbname] = builder_->table(); @@ -270,10 +275,10 @@ std::shared_ptr MockCatalogService::getTableMeta(const std::strin return impl_->getTableMeta(db, tbname); } -int32_t MockCatalogService::catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) const { - return impl_->catalogGetHandle(clusterId, catalogHandle); +int32_t MockCatalogService::catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const { + return impl_->catalogGetTableMeta(pDBName, pTableName, pTableMeta); } -int32_t MockCatalogService::catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const { - return impl_->catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, pTableMeta); -} +int32_t MockCatalogService::catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const { + return impl_->catalogGetTableHashVgroup(pDBName, pTableName, vgInfo); +} \ No newline at end of file diff --git a/source/libs/parser/test/mockCatalogService.h b/source/libs/parser/test/mockCatalogService.h index cd01db09cce7ff4c5949671ff44af041a47d80cb..889a45439715792d09a33d4c9073b4175a275dca 100644 --- a/source/libs/parser/test/mockCatalogService.h +++ b/source/libs/parser/test/mockCatalogService.h @@ -57,9 +57,8 @@ public: void showTables() const; std::shared_ptr getTableMeta(const std::string& db, const std::string& tbname) const; - // mock interface - int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) const; - int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const; + int32_t catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const; + int32_t catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const; private: std::unique_ptr impl_; diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index a2078defdaf1acbe5b9e0824f08a934371345d6c..f9b172210627cede4727f8e1a3eeaa2eb9ee3ae1 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -93,7 +93,7 @@ void generateLogicplan(const char* sql) { ASSERT_EQ(ret, 0); struct SQueryPlanNode* n = nullptr; - code = createQueryPlan(pQueryInfo, &n); + code = createQueryPlan((const SQueryNode*)pQueryInfo, &n); char* str = NULL; queryPlanToString(n, &str); @@ -156,7 +156,7 @@ TEST(testCase, planner_test) { ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); struct SQueryPlanNode* n = nullptr; - code = createQueryPlan(pQueryInfo, &n); + code = createQueryPlan((const SQueryNode*)pQueryInfo, &n); char* str = NULL; queryPlanToString(n, &str); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index b1c4643f154f56be96a096d09161e1f0aa74540a..1bee95b8e56965f424488e38c3fe2da6a66de677 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -40,6 +40,7 @@ extern "C" { #define QNODE_SESSIONWINDOW 12 #define QNODE_STATEWINDOW 13 #define QNODE_FILL 14 +#define QNODE_INSERT 15 typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not @@ -82,7 +83,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode); * @param pQueryNode * @return */ -int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode); +int32_t createQueryPlan(const SQueryNode* pNode, struct SQueryPlanNode** pQueryPlan); /** * Convert the query plan to string, in order to display it in the shell. diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index ce923314bd9530b6425fff926cb3b9662f57b813..136073aa6030945962b6740bfd3f74d55b30fc78 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -29,7 +29,7 @@ typedef struct SJoinCond { SColumn *colCond[2]; } SJoinCond; -static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo); +static SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo); static void doDestroyQueryNode(SQueryPlanNode* pQueryNode); int32_t printExprInfo(char* buf, const SQueryPlanNode* pQueryNode, int32_t len); @@ -37,13 +37,37 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { return 0; } -int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { - SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo); +int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) { + *pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); + SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); + if (NULL == *pQueryPlan || NULL == blocks) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + (*pQueryPlan)->info.type = QNODE_INSERT; + taosArrayAddAll(blocks, pInsert->pDataBlocks); + (*pQueryPlan)->pExtInfo = blocks; + return TSDB_CODE_SUCCESS; +} + +int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) { + SArray* upstream = createQueryPlanImpl(pSelect); assert(taosArrayGetSize(upstream) == 1); + *pQueryPlan = taosArrayGetP(upstream, 0); + taosArrayDestroy(upstream); + return TSDB_CODE_SUCCESS; +} - *pQueryNode = taosArrayGetP(upstream, 0); +int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { + switch (nodeType(pNode)) { + case TSDB_SQL_SELECT: { + return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan); + } + case TSDB_SQL_INSERT: + return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan); + default: + return TSDB_CODE_FAILED; + } - taosArrayDestroy(upstream); return TSDB_CODE_SUCCESS; } @@ -62,7 +86,7 @@ void destroyQueryPlan(SQueryPlanNode* pQueryNode) { //====================================================================================================================== static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, - SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) { + SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) { SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); pNode->info.type = type; @@ -123,7 +147,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla } case QNODE_FILL: { // todo !! - pNode->pExtInfo = pExtInfo; + pNode->pExtInfo = (void*)pExtInfo; break; } @@ -145,7 +169,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla return pNode; } -static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, +static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, SArray* pExprs, SArray* tableCols) { if (pQueryInfo->info.onlyTagQuery) { int32_t num = (int32_t) taosArrayGetSize(pExprs); @@ -186,7 +210,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe return pNode; } -static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) { +static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) { // group by column not by tag size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo); @@ -239,7 +263,7 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr); SArray* p = pQueryInfo->exprList[0]; // top expression in select clause - pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p, taosArrayGetSize(p), pInfo); + pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p->pData, taosArrayGetSize(p), pInfo); } if (pQueryInfo->order != NULL) { @@ -254,7 +278,7 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer return pNode; } -static SQueryPlanNode* doCreateQueryPlanForSingleTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs, +static SQueryPlanNode* doCreateQueryPlanForSingleTable(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs, SArray* tableCols) { char name[TSDB_TABLE_FNAME_LEN] = {0}; tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN); @@ -286,7 +310,7 @@ static bool isAllAggExpr(SArray* pList) { return true; } -SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) { +SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { SArray* upstream = NULL; if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 0696a5b24e82b771d897257d1de008853b93dc08..9db9e059b33df0bfe8f6e70492b8b6151bb0a395 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -14,6 +14,7 @@ */ #include "plannerInt.h" +#include "exception.h" #include "parser.h" #define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan @@ -33,6 +34,13 @@ static const char* gOpName[] = { #undef INCLUDE_AS_NAME }; +static void* vailidPointer(void* p) { + if (NULL == p) { + THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); + } + return p; +} + const char* opTypeToOpName(int32_t type) { return gOpName[type]; } @@ -46,6 +54,25 @@ int32_t opNameToOpType(const char* name) { return OP_Unknown; } +static SDataSink* initDataSink(int32_t type, int32_t size) { + SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); + sink->info.type = type; + return sink; +} + +static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher)); + return (SDataSink*)dispatcher; +} + +static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) { + SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter)); + inserter->numOfTables = pBlocks->numOfTables; + inserter->size = pBlocks->size; + SWAP(inserter->pData, pBlocks->pData, char*); + return (SDataSink*)inserter; +} + static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { dataBlockSchema->numOfCols = pPlanNode->numOfCols; dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols); @@ -72,10 +99,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) { } static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { - SPhyNode* node = (SPhyNode*)calloc(1, size); - if (NULL == node) { - return NULL; - } + SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size)); node->info.type = type; node->info.name = opTypeToOpName(type); if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { @@ -138,7 +162,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable } static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { - SSubplan* subplan = calloc(1, sizeof(SSubplan)); + SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan))); subplan->id = pCxt->nextId; ++(pCxt->nextId.subplanId); subplan->type = type; @@ -146,15 +170,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { if (NULL != pCxt->pCurrentSubplan) { subplan->level = pCxt->pCurrentSubplan->level + 1; if (NULL == pCxt->pCurrentSubplan->pChildern) { - pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); } taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); - subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); } SArray* currentLevel; if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { - currentLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(pCxt->pDag->pSubplans, ¤tLevel); } else { currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); @@ -164,7 +188,17 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { return subplan; } -static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) { +static void vgroupInfoToEpSet(const SVgroupInfo* vg, SEpSet* epSet) { + epSet->inUse = 0; // todo + epSet->numOfEps = vg->numOfEps; + for (int8_t i = 0; i < vg->numOfEps; ++i) { + epSet->port[i] = vg->epAddr[i].port; + strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn); + } + return; +} + +static void vgroupMsgToEpSet(const SVgroupMsg* vg, SEpSet* epSet) { epSet->inUse = 0; // todo epSet->numOfEps = vg->numOfEps; for (int8_t i = 0; i < vg->numOfEps; ++i) { @@ -179,8 +213,9 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); - vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet); + vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet); subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); + subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode); RECOVERY_CURRENT_SUBPLAN(pCxt); } return pCxt->nextId.templateId++; @@ -214,6 +249,9 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; + case QNODE_INSERT: + // Insert is not an operator in a physical plan. + break; default: assert(false); } @@ -229,26 +267,46 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { return node; } +static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + SArray* vgs = (SArray*)pPlanNode->pExtInfo; + size_t numOfVg = taosArrayGetSize(vgs); + for (int32_t i = 0; i < numOfVg; ++i) { + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); + SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i); + vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet); + subplan->pNode = NULL; + subplan->pDataSink = createDataInserter(pCxt, blocks); + } +} + static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { - SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); - ++(pCxt->nextId.templateId); - subplan->pNode = createPhyNode(pCxt, pRoot); + if (QNODE_INSERT == pRoot->info.type) { + splitInsertSubplan(pCxt, pRoot); + } else { + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); + ++(pCxt->nextId.templateId); + subplan->pNode = createPhyNode(pCxt, pRoot); + subplan->pDataSink = createDataDispatcher(pCxt, pRoot); + } // todo deal subquery } int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) { - SPlanContext context = { - .pCatalog = pCatalog, - .pDag = calloc(1, sizeof(SQueryDag)), - .pCurrentSubplan = NULL, - .nextId = {0} // todo queryid - }; - if (NULL == context.pDag) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - context.pDag->pSubplans = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); - createSubplanByLevel(&context, pQueryNode); - *pDag = context.pDag; + TRY(TSDB_MAX_TAG_CONDITIONS) { + SPlanContext context = { + .pCatalog = pCatalog, + .pDag = vailidPointer(calloc(1, sizeof(SQueryDag))), + .pCurrentSubplan = NULL, + .nextId = {0} // todo queryid + }; + *pDag = context.pDag; + context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + createSubplanByLevel(&context, pQueryNode); + } CATCH(code) { + CLEANUP_EXECUTE(); + terrno = code; + return TSDB_CODE_FAILED; + } END_TRY return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 2510797158978e3ba8fc4f0a6732a2f0a7c40ac6..15c0e632a7d7ccf1e2fa32f89e66f85f86e0259f 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -651,7 +651,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { static const char* jkPnodeName = "Name"; static const char* jkPnodeTargets = "Targets"; static const char* jkPnodeConditions = "Conditions"; -static const char* jkPnodeSchema = "Schema"; +static const char* jkPnodeSchema = "InputSchema"; static const char* jkPnodeChildren = "Children"; // The 'pParent' field do not need to be serialized. static bool phyNodeToJson(const void* obj, cJSON* jNode) { diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index e9a4591d4afe8b4b0efed0f322c9be6ba5c4ee40..7722a7d3639ffcfef62ef9610de5d1ff90daa4c4 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -24,9 +24,9 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { // todo } -int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag) { +int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SEpSet* pEpSet, struct SQueryDag** pDag) { SQueryPlanNode* logicPlan; - int32_t code = createQueryPlan(pQueryInfo, &logicPlan); + int32_t code = createQueryPlan(pNode, &logicPlan); if (TSDB_CODE_SUCCESS != code) { destroyQueryPlan(logicPlan); return code;