提交 013f8fed 编写于 作者: H Haojun Liao

[td-10564] 1. Enable and then refactor the parse the nested function in...

[td-10564] 1. Enable  and then refactor the parse the nested function in select clause. 2. Add some scalar string function in function module. 3. Add more test cases.
上级 55f3ac17
......@@ -221,7 +221,7 @@ typedef struct SScalarFunctionInfo {
char name[FUNCTIONS_NAME_MAX_LENGTH];
int8_t type; // scalar function or aggregation function
uint32_t functionId; // index of scalar function
void (*process)(const struct SScalarFuncParam *pInput, struct SScalarFuncParam* pOutput);
void (*process)(struct SScalarFuncParam* pOutput, size_t numOfInput, const struct SScalarFuncParam *pInput);
} SScalarFunctionInfo;
typedef struct SMultiFunctionsDesc {
......
......@@ -80,8 +80,7 @@ typedef struct SQueryStmtInfo {
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SArray** exprList; // SArray<SExprInfo*>
SLimit limit;
SLimit slimit;
STagCond tagCond;
......@@ -112,6 +111,7 @@ typedef struct SQueryStmtInfo {
struct SQueryStmtInfo *pDownstream;
int32_t havingFieldNum;
SMultiFunctionsDesc info;
int32_t exprListLevelIndex;
} SQueryStmtInfo;
typedef struct SColumnIndex {
......@@ -165,7 +165,7 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src);
void columnListCopy(SArray* dst, const SArray* src, uint64_t uid);
void columnListDestroy(SArray* pColumnList);
void dropAllExprInfo(SArray* pExprInfo);
void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel);
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, struct tExprNode* pParamExpr, SSchema* pResSchema, int16_t interSize);
int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
......
......@@ -140,8 +140,9 @@ do { \
#define FUNCTION_ROUND 4503
#define FUNCTION_LENGTH 4800
#define FUNCTION_LTRIM 4801
#define FUNCTION_RTRIM 4802
#define FUNCTION_CONCAT 4801
#define FUNCTION_LTRIM 4802
#define FUNCTION_RTRIM 4803
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
#define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER))
......
......@@ -12,10 +12,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <function.h>
#include "os.h"
#include "ttime.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "ttime.h"
#include "exception.h"
#include "executorimpl.h"
......@@ -161,7 +162,7 @@ int64_t genQueryId(void) {
static int32_t getExprFunctionId(SExprInfo *pExprInfo) {
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
return pExprInfo->pExpr->_node.functionId;
return pExprInfo->pExpr->_function.functionId;
}
static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) {
......@@ -1069,6 +1070,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo
}
static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
#if 0
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
......@@ -1079,7 +1081,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
if (pCtx[i].functionId == FUNCTION_ARITHM) {
// setArithParams((SScalarFunctionSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else {
SColIndex* pCol = &pOperator->pExpr[i].base.colInfo;
SColIndex* pCol = &pOperator->pExpr[i].base.pColumns->info.;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCtx[i].functionId == FUNCTION_BLKINFO) ||
(TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
......@@ -1087,7 +1089,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
// in case of the block distribution query, the inputBytes is not a constant value.
pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);
assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
if (pCtx[i].functionId < 0) {
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
......@@ -1112,7 +1114,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);
assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
for(int32_t j = 0; j < pBlock->info.rows; ++j) {
char* dst = p->pData + j * p->info.bytes;
taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
......@@ -1120,6 +1122,8 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
}
}
}
#endif
}
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
......@@ -1170,11 +1174,11 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
continue;
}
SColIndex * pColIndex = &pExpr[k].base.colInfo;
SColIndex * pColIndex = NULL/*&pExpr[k].base.colInfo*/;
int16_t index = pColIndex->colIndex;
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, index);
assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey);
// assert(pColInfo->info.colId == pColIndex->info.colId && curTs != windowKey);
double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) {
......@@ -1858,7 +1862,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
for (int32_t i = 0; i < numOfOutput; ++i) {
SSqlExpr *pSqlExpr = &pExpr[i].base;
SQLFunctionCtx* pCtx = &pFuncCtx[i];
#if 0
SColIndex *pIndex = &pSqlExpr->colInfo;
if (TSDB_COL_REQ_NULL(pIndex->flag)) {
......@@ -1867,7 +1871,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
} else {
pCtx->requireNull = false;
}
#endif
// pCtx->inputBytes = pSqlExpr->colBytes;
// pCtx->inputType = pSqlExpr->colType;
......@@ -2359,7 +2363,7 @@ bool onlyQueryTags(SQueryAttr* pQueryAttr) {
if (functionId != FUNCTION_TAGPRJ &&
functionId != FUNCTION_TID_TAG &&
(!(functionId == FUNCTION_COUNT && pExprInfo->base.pColumns->colId == TSDB_TBNAME_COLUMN_INDEX)) &&
(!(functionId == FUNCTION_COUNT && pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX)) &&
(!(functionId == FUNCTION_PRJ && TSDB_COL_IS_UD_COL(pExprInfo->base.pColumns->flag)))) {
return false;
}
......@@ -2879,7 +2883,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
int32_t numOfOutput = pTableScanInfo->numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
int32_t colId = pTableScanInfo->pExpr[i].base.pColumns->colId;
int32_t colId = pTableScanInfo->pExpr[i].base.pColumns->info.colId;
// group by + first/last should not apply the first/last block filter
if (functionId < 0) {
......@@ -3211,7 +3215,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
}
// todo use tag column index to optimize performance
doSetTagValueInParam(pTable, pLocalExprInfo->base.pColumns->colId, &pCtx[idx].tag, pLocalExprInfo->base.resSchema.type,
doSetTagValueInParam(pTable, pLocalExprInfo->base.pColumns->info.colId, &pCtx[idx].tag, pLocalExprInfo->base.resSchema.type,
pLocalExprInfo->base.resSchema.bytes);
if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resSchema.type)
......@@ -3787,7 +3791,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pExpr->colInfo.colId) {
if (pres->info.colId == pExpr->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
......@@ -3819,7 +3823,7 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pExpr1->colInfo.colId) {
if (pres->info.colId == pExpr1->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
......@@ -5041,7 +5045,7 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
SSqlExpr* pExpr = &pQuery->pExpr1[j].base;
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[j]);
if (index->colId == pExpr->colInfo.colId &&
if (index->colId == pExpr->pColumns->info.colId &&
(functionId == FUNCTION_PRJ || functionId == FUNCTION_TAG || functionId == FUNCTION_TS)) {
index->colIndex = j;
index->colId = pExpr->resSchema.colId;
......@@ -5073,8 +5077,8 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
// FUNCTION_TAG_DUMMY function needs to be ignored
if (index->colId == pExpr->pColumns->info.colId &&
((TSDB_COL_IS_TAG(pExpr->colInfo.flag) && functionId == FUNCTION_TAG) ||
(TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && functionId == FUNCTION_PRJ))) {
((TSDB_COL_IS_TAG(pExpr->pColumns->flag) && functionId == FUNCTION_TAG) ||
(TSDB_COL_IS_NORMAL_COL(pExpr->pColumns->flag) && functionId == FUNCTION_PRJ))) {
index->colIndex = j;
index->colId = pExpr->resSchema.colId;
found = true;
......@@ -5291,7 +5295,7 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData col = {{0}};
col.info.colId = pExpr[i].base.pColumns->colId;
col.info.colId = pExpr[i].base.pColumns->info.colId;
// col.info.bytes = pExpr[i].base.colBytes;
// col.info.type = pExpr[i].base.colType;
taosArrayPush(pDataBlock->pDataBlock, &col);
......@@ -6777,7 +6781,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
int16_t type = pExprInfo->base.resSchema.type;
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->colId) {
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
bytes = pQueryAttr->tagColList[i].bytes;
type = pQueryAttr->tagColList[i].type;
break;
......@@ -6809,10 +6813,10 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
output += sizeof(pQueryAttr->vgId);
char* data = NULL;
if (pExprInfo->base.pColumns->colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->colId, type, bytes);
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
}
doSetTagValueToResultBuf(output, data, type, bytes);
......@@ -6848,10 +6852,10 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
type = pExprInfo[j].base.resSchema.type;
bytes = pExprInfo[j].base.resSchema.bytes;
if (pExprInfo[j].base.pColumns->colId == TSDB_TBNAME_COLUMN_INDEX) {
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->colId, type, bytes);
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
}
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
......@@ -7047,20 +7051,20 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) {
int32_t j = 0;
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
if (TSDB_COL_IS_TAG(pExpr->pColumns->flag)) {
if (pExpr->pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
return TSDB_TBNAME_COLUMN_INDEX;
}
while(j < pTableInfo->numOfTags) {
if (pExpr->colInfo.colId == pTagCols[j].colId) {
if (pExpr->pColumns->info.colId == pTagCols[j].colId) {
return j;
}
j += 1;
}
} else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data
} /*else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data
return TSDB_UD_COLUMN_INDEX;
} else {
while (j < pTableInfo->numOfCols) {
......@@ -7070,7 +7074,7 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p
j += 1;
}
}
}*/
return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value
}
......@@ -7815,7 +7819,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu
// bytes = tDataTypes[type].bytes;
// } else {
// int32_t index = pExprs[i].base.colInfo.colIndex;
// assert(prevExpr[index].base.resSchema.colId == pExprs[i].base.pColumns->colId);
// assert(prevExpr[index].base.resSchema.colId == pExprs[i].base.pColumns->info.colId);
//
// type = prevExpr[index].base.resSchema.type;
// bytes = prevExpr[index].base.resSchema.bytes;
......@@ -7951,7 +7955,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
// }
// todo opt performance
SColIndex *pColIndex = &pSqlExprMsg->colInfo;
SColIndex *pColIndex = NULL;/*&pSqlExprMsg->colInfo;*/
if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
int32_t f = 0;
for (f = 0; f < pQueryAttr->numOfCols; ++f) {
......
......@@ -37,7 +37,7 @@ typedef struct SScalarFunctionSupport {
char** data;
} SScalarFunctionSupport;
extern struct SScalarFunctionInfo scalarFunc[5];
extern struct SScalarFunctionInfo scalarFunc[8];
int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarFuncParam* pOutput,
void* param, char* (*getSourceDataBlock)(void*, const char*, int32_t));
......
......@@ -9,8 +9,9 @@ static void assignBasicParaInfo(struct SScalarFuncParam* dst, const struct SScal
dst->num = src->num;
}
static void tceil(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
static void tceil(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
switch (pLeft->bytes) {
case TSDB_DATA_TYPE_FLOAT: {
......@@ -34,8 +35,9 @@ static void tceil(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
}
}
static void tfloor(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
static void tfloor(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
switch (pLeft->bytes) {
case TSDB_DATA_TYPE_FLOAT: {
......@@ -61,8 +63,9 @@ static void tfloor(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
}
}
static void _tabs(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
static void _tabs(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
switch (pLeft->bytes) {
case TSDB_DATA_TYPE_FLOAT: {
......@@ -118,8 +121,9 @@ static void _tabs(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
}
}
static void tround(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
static void tround(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
assignBasicParaInfo(pOutput, pLeft);
assert(numOfInput == 1);
switch (pLeft->bytes) {
case TSDB_DATA_TYPE_FLOAT: {
......@@ -143,7 +147,9 @@ static void tround(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
}
}
static void tlength(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
static void tlength(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
assert(numOfInput == 1);
int64_t* out = (int64_t*) pOutput->data;
char* s = pLeft->data;
......@@ -152,6 +158,46 @@ static void tlength(const SScalarFuncParam *pLeft, SScalarFuncParam* pOutput) {
}
}
static void tconcat(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
assert(numOfInput > 0);
int32_t rowLen = 0;
int32_t num = 1;
for(int32_t i = 0; i < numOfInput; ++i) {
rowLen += pLeft[i].bytes;
if (pLeft[i].num > 1) {
num = pLeft[i].num;
}
}
pOutput->data = realloc(pOutput->data, rowLen * num);
assert(pOutput->data);
char* rstart = pOutput->data;
for(int32_t i = 0; i < num; ++i) {
char* s = rstart;
varDataSetLen(s, 0);
for (int32_t j = 0; j < numOfInput; ++j) {
char* p1 = POINTER_SHIFT(pLeft[j].data, i * pLeft[j].bytes);
memcpy(varDataVal(s) + varDataLen(s), varDataVal(p1), varDataLen(p1));
varDataLen(s) += varDataLen(p1);
}
rstart += rowLen;
}
}
static void tltrim(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
}
static void trtrim(SScalarFuncParam* pOutput, size_t numOfInput, const SScalarFuncParam *pLeft) {
}
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
switch(type) {
case TSDB_DATA_TYPE_TINYINT:
......@@ -256,7 +302,6 @@ int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarFuncPa
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(pExprs->_node.optr);
SScalarFuncParam left = {0}, right = {0};
if (pLeft->nodeType == TEXPR_BINARYEXPR_NODE || pLeft->nodeType == TEXPR_UNARYEXPR_NODE) {
setScalarFuncParam(&left, leftOutput.type, leftOutput.bytes, leftOutput.data, leftOutput.num);
} else if (pLeft->nodeType == TEXPR_COL_NODE) {
......@@ -318,12 +363,15 @@ int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarFuncPa
return 0;
}
SScalarFunctionInfo scalarFunc[5] = {
SScalarFunctionInfo scalarFunc[8] = {
{"ceil", FUNCTION_TYPE_SCALAR, FUNCTION_CEIL, tceil},
{"floor", FUNCTION_TYPE_SCALAR, FUNCTION_FLOOR, tfloor},
{"abs", FUNCTION_TYPE_SCALAR, FUNCTION_ABS, _tabs},
{"round", FUNCTION_TYPE_SCALAR, FUNCTION_ROUND, tround},
{"length", FUNCTION_TYPE_SCALAR, FUNCTION_LENGTH, tlength},
{"concat", FUNCTION_TYPE_SCALAR, FUNCTION_CONCAT, tconcat},
{"ltrim", FUNCTION_TYPE_SCALAR, FUNCTION_LTRIM, tltrim},
{"rtrim", FUNCTION_TYPE_SCALAR, FUNCTION_RTRIM, trtrim},
};
void setScalarFunctionSupp(struct SScalarFunctionSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) {
......
......@@ -31,7 +31,7 @@ SSchema *getTableTagSchema(const STableMeta* pTableMeta);
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo);
SExprInfo* createBinaryExprInfo(struct tExprNode* pNode, SSchema* pResSchema);
void addExprInfo(SQueryStmtInfo* pQueryInfo, int32_t index, SExprInfo* pExprInfo);
void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t level);
void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize);
SExprInfo* getExprInfo(SQueryStmtInfo* pQueryInfo, int32_t index);
......
......@@ -19,7 +19,7 @@ SSchema* getTbnameColumnSchema() {
}
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprList);
return taosArrayGetSize(pQueryInfo->exprList[0]);
}
SSchema* getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
......@@ -104,7 +104,6 @@ SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SC
if (pParamExpr != NULL) {
pExpr->pExpr = createFunctionExprNode(functionId, NULL, pParamExpr, 1);
// pExpr->base.pColumns
// todo set the correct number of columns
} else if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
assert(pParamExpr == NULL);
......@@ -140,15 +139,17 @@ SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SC
return pExpr;
}
void addExprInfo(SQueryStmtInfo* pQueryInfo, int32_t index, SExprInfo* pExprInfo) {
assert(pQueryInfo != NULL && pQueryInfo->exprList != NULL);
void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t level) {
assert(pExprList != NULL );
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList);
int32_t num = (int32_t) taosArrayGetSize(pExprList);
if (index == num) {
taosArrayPush(pQueryInfo->exprList, &pExprInfo);
taosArrayPush(pExprList, &pExprInfo);
} else {
taosArrayInsert(pQueryInfo->exprList, index, &pExprInfo);
taosArrayInsert(pExprList, index, &pExprInfo);
}
printf("add function, id:%d, level:%d\n", pExprInfo->pExpr->_function.functionId, level);
}
void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize) {
......@@ -176,10 +177,10 @@ void destroyExprInfo(SExprInfo* pExprInfo) {
tfree(pExprInfo);
}
void dropAllExprInfo(SArray* pExprInfo) {
static void dropOneLevelExprInfo(SArray* pExprInfo) {
size_t size = taosArrayGetSize(pExprInfo);
for(int32_t i = 0; i < size; ++i) {
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
destroyExprInfo(pExpr);
}
......@@ -187,6 +188,12 @@ void dropAllExprInfo(SArray* pExprInfo) {
taosArrayDestroy(pExprInfo);
}
void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel) {
for(int32_t i = 0; i < numOfLevel; ++i) {
dropOneLevelExprInfo(pExprInfo[i]);
}
}
void addExprInfoParam(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) {
assert (pExpr != NULL || argument != NULL || bytes != 0);
......
......@@ -70,7 +70,7 @@ static SKeyword keywordTable[] = {
{"STAR", TK_STAR},
{"SLASH", TK_SLASH},
{"REM ", TK_REM},
{"CONCAT", TK_CONCAT},
{"||", TK_CONCAT},
{"UMINUS", TK_UMINUS},
{"UPLUS", TK_UPLUS},
{"BITNOT", TK_BITNOT},
......
......@@ -66,65 +66,65 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
}
}
TEST(testCase, planner_test) {
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf buf;
buf.len = 128;
buf.buf = msg;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
SQueryStmtInfo* pQueryInfo = createQueryInfo();
setTableMetaInfo(pQueryInfo, &req);
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList;
ASSERT_EQ(taosArrayGetSize(pExprList), 2);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
ASSERT_EQ(p1->base.pColumns->uid, 110);
ASSERT_EQ(p1->base.numOfParams, 1);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_NORMAL);
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.interBytes, 16);
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
ASSERT_EQ(p1->pExpr->_function.functionId, FUNCTION_TOP);
ASSERT_TRUE(p1->pExpr->_node.pRight == NULL);
tExprNode* pParam = p1->pExpr->_node.pLeft;
ASSERT_EQ(pParam->nodeType, TEXPR_BINARYEXPR_NODE);
ASSERT_EQ(pParam->_node.optr, TSDB_BINARY_OP_DIVIDE);
ASSERT_EQ(pParam->_node.pLeft->nodeType, TEXPR_BINARYEXPR_NODE);
ASSERT_EQ(pParam->_node.pRight->nodeType, TEXPR_VALUE_NODE);
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr;
code = qCreateQueryPlan(pQueryInfo, &n);
char* str = NULL;
qQueryPlanToString(n, &str);
printf("%s\n", str);
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);
destroySqlInfo(&info1);
}
\ No newline at end of file
//TEST(testCase, planner_test) {
// SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
// ASSERT_EQ(info1.valid, true);
//
// char msg[128] = {0};
// SMsgBuf buf;
// buf.len = 128;
// buf.buf = msg;
//
// SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
// ASSERT_EQ(code, 0);
//
// SMetaReq req = {0};
// int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
// ASSERT_EQ(ret, 0);
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
//
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
//
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
// ASSERT_EQ(ret, 0);
//
// SArray* pExprList = pQueryInfo->exprList[0];
// ASSERT_EQ(taosArrayGetSize(pExprList), 2);
//
// SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
// ASSERT_EQ(p1->base.pColumns->uid, 110);
// ASSERT_EQ(p1->base.numOfParams, 1);
// ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
// ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
// ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_NORMAL);
// ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
// ASSERT_EQ(p1->base.interBytes, 16);
//
// ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
// ASSERT_EQ(p1->pExpr->_function.functionId, FUNCTION_TOP);
// ASSERT_TRUE(p1->pExpr->_node.pRight == NULL);
//
// tExprNode* pParam = p1->pExpr->_node.pLeft;
//
// ASSERT_EQ(pParam->nodeType, TEXPR_BINARYEXPR_NODE);
// ASSERT_EQ(pParam->_node.optr, TSDB_BINARY_OP_DIVIDE);
// ASSERT_EQ(pParam->_node.pLeft->nodeType, TEXPR_BINARYEXPR_NODE);
// ASSERT_EQ(pParam->_node.pRight->nodeType, TEXPR_VALUE_NODE);
//
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
//
// struct SQueryPlanNode* n = nullptr;
// code = qCreateQueryPlan(pQueryInfo, &n);
//
// char* str = NULL;
// qQueryPlanToString(n, &str);
// printf("%s\n", str);
//
// destroyQueryInfo(pQueryInfo);
// qParserClearupMetaRequestInfo(&req);
// destroySqlInfo(&info1);
//}
\ No newline at end of file
......@@ -226,9 +226,9 @@ static SQueryPlanNode* doCreateQueryPlanForOneTableImpl(SQueryStmtInfo* pQueryIn
}
if (pQueryInfo->havingFieldNum > 0 || pQueryInfo->info.arithmeticOnAgg) {
int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
pNode =
createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL);
// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
// pNode =
// createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL);
}
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
......@@ -292,7 +292,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
SArray* exprList = taosArrayInit(4, POINTER_BYTES);
if (copyExprInfoList(exprList, pQueryInfo->exprList, uid, true) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
dropAllExprInfo(exprList);
// dropAllExprInfo(exprList);
exit(-1);
}
......@@ -308,7 +308,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
// 4. add the projection query node
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList);
columnListDestroy(tableColumnList);
dropAllExprInfo(exprList);
// dropAllExprInfo(exprList);
taosArrayPush(upstream, &pNode);
}
......@@ -316,7 +316,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
SQueryTableInfo info = {0};
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList);
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
pQueryInfo->exprList->pData, num, &info, NULL);
pQueryInfo->exprList[0]->pData, num, &info, NULL);
// 4. add the aggregation or projection execution node
pNode = doCreateQueryPlanForOneTableImpl(pQueryInfo, pNode, &info, pQueryInfo->exprList);
......@@ -338,7 +338,7 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree(pQueryNode->info.name);
tfree(pQueryNode->tableInfo.tableName);
dropAllExprInfo(pQueryNode->pExpr);
// dropAllExprInfo(pQueryNode->pExpr);
if (pQueryNode->pPrevNodes != NULL) {
int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pPrevNodes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册