未验证 提交 59383e1c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9292 from taosdata/feature/3.0_wxy

TD-12416 insert statement plan
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PARSENODES_H_
#define _TD_PARSENODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "catalog.h"
#include "common.h"
#include "function.h"
#include "tmsgtype.h"
#include "tname.h"
#include "tvariant.h"
/*
* The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode.
*/
typedef struct SQueryNode {
int16_t type;
} SQueryNode;
#define nodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int16_t bytes;
} SField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR
SCond tbnameCond; // tbname query condition, only support tbname query condition on one table
SJoinInfo joinInfo; // join condition, only support two tables join currently
SArray *pCond; // for different table, the query condition must be seperated
} STagCond;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
int16_t type; // normal column/tag/ user input constant column
} SColumnIndex;
// select statement
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SStateWindow stateWindow; // state window query
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray** exprList; // SArray<SExprInfo*>
SLimit limit;
SLimit slimit;
STagCond tagCond;
SArray * colCond;
SArray * order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
struct SQueryStmtInfo *pDownstream;
SMultiFunctionsDesc info;
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
int32_t havingFieldNum;
int32_t exprListLevelIndex;
} SQueryStmtInfo;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks {
SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
int16_t nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SInsertStmtInfo;
#ifdef __cplusplus
}
#endif
#endif /*_TD_PARSENODES_H_*/
......@@ -20,108 +20,7 @@
extern "C" {
#endif
#include "catalog.h"
#include "common.h"
#include "tname.h"
#include "tvariant.h"
#include "function.h"
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int16_t bytes;
} SField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR
SCond tbnameCond; // tbname query condition, only support tbname query condition on one table
SJoinInfo joinInfo; // join condition, only support two tables join currently
SArray *pCond; // for different table, the query condition must be seperated
} STagCond;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SStateWindow stateWindow; // state window query
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray** exprList; // SArray<SExprInfo*>
SLimit limit;
SLimit slimit;
STagCond tagCond;
SArray * colCond;
SArray * order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
struct SQueryStmtInfo *pDownstream;
SMultiFunctionsDesc info;
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
int32_t havingFieldNum;
int32_t exprListLevelIndex;
} SQueryStmtInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
int16_t type; // normal column/tag/ user input constant column
} SColumnIndex;
struct SInsertStmtInfo;
#include "parsenodes.h"
/**
* True will be returned if the input sql string is insert, false otherwise.
......@@ -156,26 +55,6 @@ typedef struct SParseContext {
*/
int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t* type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen);
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks {
int64_t vgId; // virtual group id
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SInsertStmtInfo;
/**
* Parse the insert sql statement.
* @param pStr sql string
......@@ -211,9 +90,9 @@ typedef struct SSourceParam {
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, const char* funcName, SSourceParam* pSource, SSchema* pResSchema, int16_t interSize);
int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t copyAllExprInfo(SArray* dst, const SArray* src, bool deepcopy);
int32_t getExprFunctionLevel(SQueryStmtInfo* pQueryInfo);
int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo);
STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
......
......@@ -25,6 +25,7 @@ extern "C" {
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
#define QUERY_TYPE_MODIFY 4
enum OPERATOR_TYPE_E {
OP_Unknown,
......@@ -58,18 +59,17 @@ typedef struct SQueryNodeBasicInfo {
typedef struct SDataSink {
SQueryNodeBasicInfo info;
SDataBlockSchema schema;
} SDataSink;
typedef struct SDataDispatcher {
SDataSink sink;
// todo
} SDataDispatcher;
typedef struct SDataInserter {
SDataSink sink;
uint64_t uid; // unique id of the table
// todo data field
int32_t numOfTables;
uint32_t size;
char *pData;
} SDataInserter;
typedef struct SPhyNode {
......@@ -125,6 +125,7 @@ typedef struct SSubplan {
SArray *pChildern; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan
SDataSink *pDataSink; // data of the subplan flow into the datasink
} SSubplan;
typedef struct SQueryDag {
......@@ -133,10 +134,12 @@ typedef struct SQueryDag {
SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
} SQueryDag;
struct SQueryNode;
/**
* Create the physical plan for the query, according to the AST.
*/
int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag);
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
......@@ -144,7 +147,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
// @ep one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
/**
* Convert to subplan to string for the scheduler to send to the executor
......
......@@ -78,7 +78,7 @@ typedef struct {
typedef struct STableDataBlocks {
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
int32_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table
......
......@@ -123,7 +123,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
dataBuf->nAllocSize = dataBuf->headerSize * 2;
}
//dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->pData = malloc(dataBuf->nAllocSize);
if (dataBuf->pData == NULL) {
tfree(dataBuf);
......@@ -145,8 +144,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
// tNameAssign(&dataBuf->tableName, name);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
......
......@@ -50,15 +50,16 @@ enum {
};
typedef struct SInsertParseContext {
SParseContext* pComCxt;
const char* pSql;
SMsgBuf msg;
STableMeta* pTableMeta;
SParsedDataColInfo tags;
SKVRowBuilder tagsBuilder;
SHashObj* pTableBlockHashObj;
SArray* pTableDataBlocks;
SArray* pVgDataBlocks;
SParseContext* pComCxt; // input
const char* pSql; // input
SMsgBuf msg; // input
STableMeta* pTableMeta; // each table
SParsedDataColInfo tags; // each table
SKVRowBuilder tagsBuilder; // each table
SHashObj* pVgroupsHashObj; // global
SHashObj* pTableBlockHashObj; // global
SArray* pTableDataBlocks; // global
SArray* pVgDataBlocks; // global
int32_t totalNum;
SInsertStmtInfo* pOutput;
} SInsertParseContext;
......@@ -173,10 +174,12 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
char tableName[TSDB_TABLE_NAME_LEN] = {0};
CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName));
CHECK_CODE(catalogGetTableMeta(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &pCxt->pTableMeta));
SVgroupInfo vg;
CHECK_CODE(catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, tableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
return TSDB_CODE_SUCCESS;
}
// todo speedup by using hash list
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
while (start < end) {
if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
......@@ -187,16 +190,16 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS
return -1;
}
static void fillMsgHeader(SVgDataBlocks* dst) {
SMsgDesc* desc = (SMsgDesc*)dst->pData;
static void buildMsgHeader(SVgDataBlocks* blocks) {
SMsgDesc* desc = (SMsgDesc*)blocks->pData;
desc->numOfVnodes = htonl(1);
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
submit->header.vgId = htonl(dst->vgId);
submit->header.contLen = htonl(dst->size - sizeof(SMsgDesc));
submit->header.vgId = htonl(blocks->vg.vgId);
submit->header.contLen = htonl(blocks->size - sizeof(SMsgDesc));
submit->length = submit->header.contLen;
submit->numOfBlocks = htonl(dst->numOfTables);
submit->numOfBlocks = htonl(blocks->numOfTables);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
int32_t numOfBlocks = dst->numOfTables;
int32_t numOfBlocks = blocks->numOfTables;
while (numOfBlocks--) {
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
......@@ -222,11 +225,11 @@ static int32_t buildOutput(SInsertParseContext* pCxt) {
if (NULL == dst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
dst->vgId = src->vgId;
taosHashGetClone(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
dst->numOfTables = src->numOfTables;
dst->size = src->size;
SWAP(dst->pData, src->pData, char*);
fillMsgHeader(dst);
buildMsgHeader(dst);
taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
}
return TSDB_CODE_SUCCESS;
......@@ -546,7 +549,7 @@ static FORCE_INLINE int32_t parseOneValue(SInsertParseContext* pCxt, SToken* pTo
}
case TSDB_DATA_TYPE_BINARY: {
// too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z);
}
return func(pToken->z, pToken->n, param);
......@@ -644,9 +647,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema,
CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param));
}
destroyBoundColumnInfo(&pCxt->tags);
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
if (NULL == row) {
return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
}
......@@ -799,13 +800,30 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
return TSDB_CODE_SUCCESS;
}
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
tfree(pCxt->pTableMeta);
destroyBoundColumnInfo(&pCxt->tags);
tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
}
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyInsertParseContextForTable(pCxt);
taosHashCleanup(pCxt->pVgroupsHashObj);
taosHashCleanup(pCxt->pTableBlockHashObj);
destroyBlockArrayList(pCxt->pTableDataBlocks);
destroyBlockArrayList(pCxt->pVgDataBlocks);
}
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// for each table
while (1) {
destroyInsertParseContextForTable(pCxt);
SToken sToken;
// pSql -> tb_name ...
NEXT_TOKEN(pCxt->pSql, sToken);
......@@ -867,15 +885,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return buildOutput(pCxt);
}
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
tfree(pCxt->pTableMeta);
destroyBoundColumnInfo(&pCxt->tags);
tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
taosHashCleanup(pCxt->pTableBlockHashObj);
destroyBlockArrayList(pCxt->pTableDataBlocks);
destroyBlockArrayList(pCxt->pVgDataBlocks);
}
// INSERT INTO
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
......@@ -888,12 +897,13 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
.pSql = pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pTableMeta = NULL,
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.totalNum = 0,
.pOutput = calloc(1, sizeof(SInsertStmtInfo))
};
if (NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
......
......@@ -1167,7 +1167,7 @@ void cleanupTagCond(STagCond* pTagCond) {
* @param tableIndex denote the table index for join query, where more than one table exists
* @return
*/
STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex) {
STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex) {
assert(pQueryInfo != NULL);
if (pQueryInfo->pTableMetaInfo == NULL) {
assert(pQueryInfo->numOfTables == 0);
......
......@@ -354,7 +354,7 @@ bool tscHasColumnFilter(SQueryStmtInfo* pQueryInfo) {
return false;
}
int32_t getExprFunctionLevel(SQueryStmtInfo* pQueryInfo) {
int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo) {
int32_t n = 10;
int32_t level = 0;
......
......@@ -69,7 +69,7 @@ protected:
cout << "schemaAttache:" << (int32_t)res_->schemaAttache << ", payloadType:" << (int32_t)res_->payloadType << ", insertType:" << res_->insertType << ", numOfVgs:" << num << endl;
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
cout << "vgId:" << vg->vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
SMsgDesc* desc = (SMsgDesc*)(vg->pData);
cout << "numOfVnodes:" << ntohl(desc->numOfVnodes) << endl;
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
......
......@@ -42,11 +42,15 @@ void generateTestST1(MockCatalogService* mcs) {
}
int32_t __catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
return mockCatalogService->catalogGetHandle(clusterId, catalogHandle);
return 0;
}
int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
return mockCatalogService->catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, pTableMeta);
return mockCatalogService->catalogGetTableMeta(pDBName, pTableName, pTableMeta);
}
int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) {
return mockCatalogService->catalogGetTableHashVgroup(pDBName, pTableName, vgInfo);
}
void initMetaDataEnv() {
......@@ -55,6 +59,7 @@ void initMetaDataEnv() {
static Stub stub;
stub.set(catalogGetHandle, __catalogGetHandle);
stub.set(catalogGetTableMeta, __catalogGetTableMeta);
stub.set(catalogGetTableHashVgroup, __catalogGetTableHashVgroup);
{
AddrAny any("libcatalog.so");
std::map<std::string,void*> result;
......@@ -71,6 +76,14 @@ void initMetaDataEnv() {
stub.set(f.second, __catalogGetTableMeta);
}
}
{
AddrAny any("libcatalog.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^catalogGetTableHashVgroup$", result);
for (const auto& f : result) {
stub.set(f.second, __catalogGetTableHashVgroup);
}
}
}
void generateMetaData() {
......
......@@ -90,11 +90,11 @@ public:
MockCatalogServiceImpl() : id_(1) {
}
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) const {
int32_t catalogGetHandle() const {
return 0;
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const {
int32_t catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const {
std::unique_ptr<STableMeta> table;
int32_t code = copyTableSchemaMeta(pDBName, pTableName, &table);
if (TSDB_CODE_SUCCESS != code) {
......@@ -104,6 +104,11 @@ public:
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const {
// todo
return 0;
}
TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags);
meta_[db][tbname] = builder_->table();
......@@ -270,10 +275,10 @@ std::shared_ptr<MockTableMeta> MockCatalogService::getTableMeta(const std::strin
return impl_->getTableMeta(db, tbname);
}
int32_t MockCatalogService::catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) const {
return impl_->catalogGetHandle(clusterId, catalogHandle);
int32_t MockCatalogService::catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const {
return impl_->catalogGetTableMeta(pDBName, pTableName, pTableMeta);
}
int32_t MockCatalogService::catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const {
return impl_->catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, pTableMeta);
int32_t MockCatalogService::catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const {
return impl_->catalogGetTableHashVgroup(pDBName, pTableName, vgInfo);
}
\ No newline at end of file
......@@ -57,9 +57,8 @@ public:
void showTables() const;
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const;
// mock interface
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) const;
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const;
private:
std::unique_ptr<MockCatalogServiceImpl> impl_;
......
......@@ -93,7 +93,7 @@ void generateLogicplan(const char* sql) {
ASSERT_EQ(ret, 0);
struct SQueryPlanNode* n = nullptr;
code = createQueryPlan(pQueryInfo, &n);
code = createQueryPlan((const SQueryNode*)pQueryInfo, &n);
char* str = NULL;
queryPlanToString(n, &str);
......@@ -156,7 +156,7 @@ TEST(testCase, planner_test) {
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr;
code = createQueryPlan(pQueryInfo, &n);
code = createQueryPlan((const SQueryNode*)pQueryInfo, &n);
char* str = NULL;
queryPlanToString(n, &str);
......
......@@ -40,6 +40,7 @@ extern "C" {
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
#define QNODE_INSERT 15
typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not
......@@ -82,7 +83,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode);
* @param pQueryNode
* @return
*/
int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode);
int32_t createQueryPlan(const SQueryNode* pNode, struct SQueryPlanNode** pQueryPlan);
/**
* Convert the query plan to string, in order to display it in the shell.
......
......@@ -29,7 +29,7 @@ typedef struct SJoinCond {
SColumn *colCond[2];
} SJoinCond;
static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
static SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo);
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode);
int32_t printExprInfo(char* buf, const SQueryPlanNode* pQueryNode, int32_t len);
......@@ -37,13 +37,37 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0;
}
int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo);
int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) {
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
if (NULL == *pQueryPlan || NULL == blocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
(*pQueryPlan)->info.type = QNODE_INSERT;
taosArrayAddAll(blocks, pInsert->pDataBlocks);
(*pQueryPlan)->pExtInfo = blocks;
return TSDB_CODE_SUCCESS;
}
int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) {
SArray* upstream = createQueryPlanImpl(pSelect);
assert(taosArrayGetSize(upstream) == 1);
*pQueryPlan = taosArrayGetP(upstream, 0);
taosArrayDestroy(upstream);
return TSDB_CODE_SUCCESS;
}
*pQueryNode = taosArrayGetP(upstream, 0);
int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
switch (nodeType(pNode)) {
case TSDB_SQL_SELECT: {
return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan);
}
case TSDB_SQL_INSERT:
return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan);
default:
return TSDB_CODE_FAILED;
}
taosArrayDestroy(upstream);
return TSDB_CODE_SUCCESS;
}
......@@ -62,7 +86,7 @@ void destroyQueryPlan(SQueryPlanNode* pQueryNode) {
//======================================================================================================================
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) {
SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) {
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
pNode->info.type = type;
......@@ -123,7 +147,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
}
case QNODE_FILL: { // todo !!
pNode->pExtInfo = pExtInfo;
pNode->pExtInfo = (void*)pExtInfo;
break;
}
......@@ -145,7 +169,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
return pNode;
}
static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info,
static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info,
SArray* pExprs, SArray* tableCols) {
if (pQueryInfo->info.onlyTagQuery) {
int32_t num = (int32_t) taosArrayGetSize(pExprs);
......@@ -186,7 +210,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
return pNode;
}
static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) {
static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) {
// group by column not by tag
size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo);
......@@ -239,7 +263,7 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer
memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr);
SArray* p = pQueryInfo->exprList[0]; // top expression in select clause
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p, taosArrayGetSize(p), pInfo);
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p->pData, taosArrayGetSize(p), pInfo);
}
if (pQueryInfo->order != NULL) {
......@@ -254,7 +278,7 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer
return pNode;
}
static SQueryPlanNode* doCreateQueryPlanForSingleTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
static SQueryPlanNode* doCreateQueryPlanForSingleTable(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
SArray* tableCols) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN);
......@@ -286,7 +310,7 @@ static bool isAllAggExpr(SArray* pList) {
return true;
}
SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
SArray* upstream = NULL;
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause
......
......@@ -14,6 +14,7 @@
*/
#include "plannerInt.h"
#include "exception.h"
#include "parser.h"
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
......@@ -33,6 +34,13 @@ static const char* gOpName[] = {
#undef INCLUDE_AS_NAME
};
static void* vailidPointer(void* p) {
if (NULL == p) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
return p;
}
const char* opTypeToOpName(int32_t type) {
return gOpName[type];
}
......@@ -46,6 +54,25 @@ int32_t opNameToOpType(const char* name) {
return OP_Unknown;
}
static SDataSink* initDataSink(int32_t type, int32_t size) {
SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size));
sink->info.type = type;
return sink;
}
static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher));
return (SDataSink*)dispatcher;
}
static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) {
SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter));
inserter->numOfTables = pBlocks->numOfTables;
inserter->size = pBlocks->size;
SWAP(inserter->pData, pBlocks->pData, char*);
return (SDataSink*)inserter;
}
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
dataBlockSchema->numOfCols = pPlanNode->numOfCols;
dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
......@@ -72,10 +99,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) {
}
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)calloc(1, size);
if (NULL == node) {
return NULL;
}
SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size));
node->info.type = type;
node->info.name = opTypeToOpName(type);
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
......@@ -138,7 +162,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
}
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = calloc(1, sizeof(SSubplan));
SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan)));
subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId);
subplan->type = type;
......@@ -146,15 +170,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) {
pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
}
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
}
SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
} else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
......@@ -164,7 +188,17 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
return subplan;
}
static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SEpSet* epSet) {
epSet->inUse = 0; // todo
epSet->numOfEps = vg->numOfEps;
for (int8_t i = 0; i < vg->numOfEps; ++i) {
epSet->port[i] = vg->epAddr[i].port;
strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
}
return;
}
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
epSet->inUse = 0; // todo
epSet->numOfEps = vg->numOfEps;
for (int8_t i = 0; i < vg->numOfEps; ++i) {
......@@ -179,8 +213,9 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
return pCxt->nextId.templateId++;
......@@ -214,6 +249,9 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode);
break;
case QNODE_INSERT:
// Insert is not an operator in a physical plan.
break;
default:
assert(false);
}
......@@ -229,26 +267,46 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
return node;
}
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
size_t numOfVg = taosArrayGetSize(vgs);
for (int32_t i = 0; i < numOfVg; ++i) {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
subplan->pNode = NULL;
subplan->pDataSink = createDataInserter(pCxt, blocks);
}
}
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_INSERT == pRoot->info.type) {
splitInsertSubplan(pCxt, pRoot);
} else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
++(pCxt->nextId.templateId);
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
}
// todo deal subquery
}
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) {
TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = {
.pCatalog = pCatalog,
.pDag = calloc(1, sizeof(SQueryDag)),
.pDag = vailidPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL,
.nextId = {0} // todo queryid
};
if (NULL == context.pDag) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
context.pDag->pSubplans = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
createSubplanByLevel(&context, pQueryNode);
*pDag = context.pDag;
context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode);
} CATCH(code) {
CLEANUP_EXECUTE();
terrno = code;
return TSDB_CODE_FAILED;
} END_TRY
return TSDB_CODE_SUCCESS;
}
......
......@@ -651,7 +651,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
static const char* jkPnodeName = "Name";
static const char* jkPnodeTargets = "Targets";
static const char* jkPnodeConditions = "Conditions";
static const char* jkPnodeSchema = "Schema";
static const char* jkPnodeSchema = "InputSchema";
static const char* jkPnodeChildren = "Children";
// The 'pParent' field do not need to be serialized.
static bool phyNodeToJson(const void* obj, cJSON* jNode) {
......
......@@ -24,9 +24,9 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
// todo
}
int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag) {
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SEpSet* pEpSet, struct SQueryDag** pDag) {
SQueryPlanNode* logicPlan;
int32_t code = createQueryPlan(pQueryInfo, &logicPlan);
int32_t code = createQueryPlan(pNode, &logicPlan);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册