提交 f80294fc 编写于 作者: H Haojun Liao

[td-10564] Support two columns in the aggregate function.

上级 6d2ecea1
......@@ -78,6 +78,8 @@ extern "C" {
#define FUNCTION_MODE 36
#define FUNCTION_SAMPLE 37
#define FUNCTION_COV 38
// determine the real data need to calculated the result
enum {
BLK_DATA_NO_NEEDED = 0x0,
......
......@@ -166,7 +166,16 @@ void columnListCopy(SArray* dst, const SArray* src, uint64_t uid);
void columnListDestroy(SArray* pColumnList);
void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel);
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, struct tExprNode* pParamExpr, SSchema* pResSchema, int16_t interSize);
typedef struct SSourceParam {
// struct tExprNode **pExprNode;
SArray *pExprNodeList; //Array<struct tExprNode*>
// SColumn *pCols;
SArray *pColumnList; //Array<struct SColumn>
int32_t num;
} SSourceParam;
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SSourceParam* pSource, SSchema* pResSchema, int16_t interSize);
int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
......
......@@ -28,7 +28,7 @@ extern "C" {
#include "function.h"
#include "tudf.h"
extern SAggFunctionInfo aggFunc[34];
extern SAggFunctionInfo aggFunc[35];
typedef struct SResultRowEntryInfo {
int8_t hasResult; // result generated, not NULL value
......
......@@ -4379,7 +4379,7 @@ int32_t functionCompatList[] = {
6, 8, 7,
};
SAggFunctionInfo aggFunc[34] = {{
SAggFunctionInfo aggFunc[35] = {{
// 0, count function does not invoke the finalize function
"count",
FUNCTION_TYPE_AGG,
......@@ -4820,4 +4820,18 @@ SAggFunctionInfo aggFunc[34] = {{
blockinfo_func_finalizer,
block_func_merge,
dataBlockRequired,
}};
},
{
// 34
"cov", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_COV,
FUNCTION_COV,
FUNCSTATE_SO | FUNCSTATE_STABLE,
function_setup,
sum_function,
function_finalizer,
sum_func_merge,
statisRequired,
}
};
......@@ -39,6 +39,9 @@ extern "C" {
TAOS_FIELD createField(const SSchema* pSchema);
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
SSourceParam addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
SInternalField* insertFieldInfo(SFieldInfo* pFieldInfo, int32_t index, SSchema* field);
int32_t getNumOfFields(SFieldInfo* pFieldInfo);
......
......@@ -28,7 +28,8 @@ int32_t getNumOfTags(const STableMeta* pTableMeta);
SSchema *getTableColumnSchema(const STableMeta *pTableMeta);
SSchema *getTableTagSchema(const STableMeta* pTableMeta);
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo);
SArray *getCurrentExprList(SQueryStmtInfo* pQueryInfo);
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo);
SExprInfo* createBinaryExprInfo(struct tExprNode* pNode, SSchema* pResSchema);
void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t level);
......
......@@ -527,6 +527,44 @@ void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t fla
}
}
SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) {
SColumn c;
c.uid = uid;
c.flag = flag;
c.info.colId = pSchema->colId;
c.info.bytes = pSchema->bytes;
c.info.type = pSchema->type;
if (tableName != NULL) {
snprintf(c.name, tListLen(c.name), "%s.%s", tableName, pSchema->name);
} else {
tstrncpy(c.name, pSchema->name, tListLen(c.name));
}
return c;
}
SSourceParam addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn) {
assert(pSourceParam != NULL);
pSourceParam->num += 1;
if (pSourceParam->pExprNodeList != NULL) {
assert(pNode != NULL && pColumn == NULL);
if (pSourceParam->pExprNodeList == NULL) {
pSourceParam->pExprNodeList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pSourceParam->pExprNodeList, &pNode);
} else {
assert(pColumn != NULL);
if (pSourceParam->pColumnList == NULL) {
pSourceParam->pColumnList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pSourceParam->pColumnList, &pColumn);
}
}
int32_t getNumOfFields(SFieldInfo* pFieldInfo) {
return pFieldInfo->numOfOutput;
}
......
......@@ -18,8 +18,14 @@ SSchema* getTbnameColumnSchema() {
return &_s;
}
SArray* getCurrentExprList(SQueryStmtInfo* pQueryInfo) {
assert(pQueryInfo != NULL && pQueryInfo->exprListLevelIndex >= 0 && pQueryInfo->exprListLevelIndex < 10);
return pQueryInfo->exprList[pQueryInfo->exprListLevelIndex];
}
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprList[0]);
SArray* pExprList = getCurrentExprList(pQueryInfo);
return taosArrayGetSize(pExprList);
}
SSchema* getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
......@@ -55,21 +61,29 @@ SSchema* getTableTagSchema(const STableMeta* pTableMeta) {
return getOneColumnSchema(pTableMeta, getTableInfo(pTableMeta).numOfColumns);
}
static tExprNode* createFunctionExprNode(int32_t functionId, SSchema* pSchema, tExprNode* pColumnNode, int32_t numOfCols) {
if (pColumnNode == NULL) {
pColumnNode = calloc(1, sizeof(tExprNode));
pColumnNode->nodeType = TEXPR_COL_NODE;
pColumnNode->pSchema = calloc(1, sizeof(SSchema));
memcpy(pColumnNode->pSchema, pSchema, sizeof(SSchema));
static tExprNode* createFunctionExprNode(int32_t functionId, struct SSourceParam *pParam) {//SSchema* pSchema, tExprNode* pColumnNode, int32_t numOfCols) {
tExprNode** p = malloc(pParam->num * POINTER_BYTES);
if (pParam->pColumnList != NULL) {
for(int32_t i = 0; i < pParam->num; ++i) {
p[i] = calloc(1, sizeof(tExprNode));
p[i]->nodeType = TEXPR_COL_NODE;
p[i]->pSchema = calloc(1, sizeof(SSchema));
memcpy(p[i]->pSchema, taosArrayGetP(pParam->pColumnList, i), sizeof(SSchema));
}
} else {
assert(pSchema == NULL);
assert(pParam->pColumnList == NULL);
for(int32_t i = 0; i < pParam->num; ++i) {
p[i] = taosArrayGetP(pParam->pExprNodeList, i);
}
}
tExprNode* pNode = calloc(1, sizeof(tExprNode));
pNode->nodeType = TEXPR_FUNCTION_NODE;
pNode->_function.functionId = functionId;
pNode->_function.pChild = pColumnNode;
pNode->_function.num = numOfCols;
pNode->_function.pChild = p;
pNode->_function.num = pParam->num;
return pNode;
}
......@@ -87,7 +101,7 @@ SExprInfo* createBinaryExprInfo(tExprNode* pNode, SSchema* pResSchema) {
return pExpr;
}
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, tExprNode* pParamExpr, SSchema* pResSchema, int16_t interSize) {
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SSourceParam* pSourceParam, SSchema* pResSchema, int16_t interSize) {
SExprInfo* pExpr = calloc(1, sizeof(SExprInfo));
if (pExpr == NULL) {
return NULL;
......@@ -99,43 +113,36 @@ SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, int16_t functionId, SC
}
SSqlExpr* p = &pExpr->base;
p->pColumns = calloc(1, sizeof(SColumn));
p->numOfCols = 1;
if (pParamExpr != NULL) {
pExpr->pExpr = createFunctionExprNode(functionId, NULL, pParamExpr, 1);
// todo set the correct number of columns
} else if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
assert(pParamExpr == NULL);
p->pColumns = calloc(pSourceParam->num, sizeof(SColumn));
p->numOfCols = pSourceParam->num;
p->interBytes = interSize;
memcpy(&p->resSchema, pResSchema, sizeof(SSchema));
if (pSourceParam->pExprNodeList != NULL) {
pExpr->pExpr = createFunctionExprNode(functionId, pSourceParam);
return pExpr;
}
SColumn* pCol = taosArrayGetP(pSourceParam->pColumnList, 0);
if (pCol->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
assert(pSourceParam->num == 1);
SSchema* s = getTbnameColumnSchema();
setColumn(p->pColumns, uid, pTableMetaInfo->aliasName, TSDB_COL_TAG, s);
pExpr->pExpr = createFunctionExprNode(functionId, s, pParamExpr, 1);
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX || functionId == FUNCTION_BLKINFO) {
assert(pParamExpr == NULL);
pExpr->pExpr = createFunctionExprNode(functionId, pSourceParam);
} else if (TSDB_COL_IS_UD_COL(pCol->flag) || functionId == FUNCTION_BLKINFO) {
setColumn(p->pColumns, uid, pTableMetaInfo->aliasName, TSDB_COL_UDC, pResSchema);
SSchema s = createSchema(pResSchema->type, pResSchema->bytes, pColIndex->columnIndex, pResSchema->name);
pExpr->pExpr = createFunctionExprNode(functionId, &s, pParamExpr, 1);
pExpr->pExpr = createFunctionExprNode(functionId, pSourceParam);
} else {
if (TSDB_COL_IS_TAG(pColIndex->type)) {
SSchema* pSchema = getTableTagSchema(pTableMetaInfo->pTableMeta);
setColumn(p->pColumns, uid, pTableMetaInfo->aliasName, TSDB_COL_TAG, &pSchema[pColIndex->columnIndex]);
pExpr->pExpr = createFunctionExprNode(functionId, &pSchema[pColIndex->columnIndex], pParamExpr, 1);
} else if (pTableMetaInfo->pTableMeta != NULL) {
// in handling select database/version/server_status(), the pTableMeta is NULL
SSchema* pSchema = getOneColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->columnIndex);
setColumn(p->pColumns, uid, pTableMetaInfo->aliasName, TSDB_COL_NORMAL, pSchema);
pExpr->pExpr = createFunctionExprNode(functionId, pSchema, pParamExpr, 1);
for(int32_t i = 0; i < pSourceParam->num; ++i) {
SColumn* c = taosArrayGetP(pSourceParam->pColumnList, i);
p->pColumns[i] = *c;
}
pExpr->pExpr = createFunctionExprNode(functionId, pSourceParam);
}
p->pColumns->flag = pColIndex->type;
p->interBytes = interSize;
memcpy(&p->resSchema, pResSchema, sizeof(SSchema));
return pExpr;
}
......@@ -149,7 +156,7 @@ void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t
taosArrayInsert(pExprList, index, &pExprInfo);
}
printf("add function, id:%d, level:%d\n", pExprInfo->pExpr->_function.functionId, level);
printf("add function, id:%d, level:%d, total:%ld\n", pExprInfo->pExpr->_function.functionId, level, taosArrayGetSize(pExprList));
}
void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize) {
......
......@@ -394,14 +394,14 @@ void sqlCheck(const char* sql, bool valid) {
//}
TEST(testCase, function_Test10) {
// sqlCheck("select c from `t.1abc`", true);
// sqlCheck("select length(c) from `t.1abc`", true);
// sqlCheck("select sum(length(a+b)) from `t.1abc`", true);
// sqlCheck("select sum(sum(a+b)) from `t.1abc`", false);
// sqlCheck("select sum(length(a) + length(b)) from `t.1abc`", true);
// sqlCheck("select length(sum(a) + sum(b)) + length(sum(a) + sum(b)) from `t.1abc`", true);
// sqlCheck("select sum(length(sum(a))) from `t.1abc`", true);
sqlCheck("select concat(a,b) from `t.1abc`", true);
sqlCheck("select c from `t.1abc`", true);
sqlCheck("select length(c) from `t.1abc`", true);
sqlCheck("select sum(length(a+b)) from `t.1abc`", true);
sqlCheck("select sum(sum(a+b)) from `t.1abc`", false);
sqlCheck("select sum(length(a) + length(b)) from `t.1abc`", true);
sqlCheck("select length(sum(a) + sum(b)) + length(sum(a) + sum(b)) from `t.1abc`", true);
sqlCheck("select sum(length(sum(a))) from `t.1abc`", true);
sqlCheck("select cov(a, b) from `t.1abc`", true);
// sqlCheck("select concat(concat(a,b), concat(a,b)) from `t.1abc`", true);
// sqlCheck("select length(length(length(a))) from `t.1abc`", true);
}
......
......@@ -191,7 +191,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
SSchema* pSchema = getOneColumnSchema(pTableMetaInfo->pTableMeta, i);
SSchema resultSchema = *pSchema;
SExprInfo* p = createExprInfo(pTableMetaInfo1, FUNCTION_PRJ, &index, NULL, &resultSchema, 0);
SExprInfo* p = NULL;//createExprInfo(pTableMetaInfo1, FUNCTION_PRJ, &index, NULL, &resultSchema, 0);
pExpr[i] = p;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册