提交 4e4e7b24 编写于 作者: H Haojun Liao

[td-2895]enable arithmetic calculation for super table.

上级 a0c21661
...@@ -3278,8 +3278,9 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf ...@@ -3278,8 +3278,9 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
for (int32_t i = 0; i < pQueryAttr->numOfExpr2; ++i) { for (int32_t i = 0; i < pQueryAttr->numOfExpr2; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SExprInfo* pExpr = pField->pExpr; SExprInfo* pExpr = pField->pExpr;
// SExprInfo *pExpr = &pQueryAttr->pExpr3[i];
SSqlExpr* pse = &pQueryAttr->pExpr2[i].base; SSqlExpr *pse = &pQueryAttr->pExpr2[i].base;
pse->uid = pTableMetaInfo->pTableMeta->id.uid; pse->uid = pTableMetaInfo->pTableMeta->id.uid;
pse->resColId = pExpr->base.resColId; pse->resColId = pExpr->base.resColId;
...@@ -3295,10 +3296,16 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf ...@@ -3295,10 +3296,16 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
} }
pse->colInfo.flag = TSDB_COL_NORMAL; pse->colInfo.flag = TSDB_COL_NORMAL;
pse->colType = pExpr->base.resType;
pse->colBytes = pExpr->base.resBytes;
pse->resType = pExpr->base.resType; pse->resType = pExpr->base.resType;
pse->resBytes = pExpr->base.resBytes; pse->resBytes = pExpr->base.resBytes;
// TODO restore refactor
int32_t inter = 0;
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, pExpr->base.functionId, 0, &pse->resType,
&pse->resBytes, &inter, 0, false);
pse->colType = pse->resType;
pse->colBytes = pse->resBytes;
} else { // arithmetic expression } else { // arithmetic expression
pse->colInfo.colId = pExpr->base.colInfo.colId; pse->colInfo.colId = pExpr->base.colInfo.colId;
pse->colType = pExpr->base.colType; pse->colType = pExpr->base.colType;
...@@ -3311,6 +3318,7 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf ...@@ -3311,6 +3318,7 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) {
tVariantAssign(&pse->param[j], &pExpr->base.param[j]); tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
buildArithmeticExprFromMsg(&pQueryAttr->pExpr2[i], NULL);
} }
} }
} }
...@@ -3321,7 +3329,7 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf ...@@ -3321,7 +3329,7 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo) { static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo) {
assert(tscIsTwoStageSTableQuery(pQueryInfo, 0)); assert(tscIsTwoStageSTableQuery(pQueryInfo, 0));
pQueryAttr->numOfExpr3 = tscNumOfFields(pQueryInfo); pQueryAttr->numOfExpr3 = tscSqlExprNumOfExprs(pQueryInfo);
pQueryAttr->pExpr3 = calloc(pQueryAttr->numOfExpr3, sizeof(SExprInfo)); pQueryAttr->pExpr3 = calloc(pQueryAttr->numOfExpr3, sizeof(SExprInfo));
if (pQueryAttr->pExpr3 == NULL) { if (pQueryAttr->pExpr3 == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3343,6 +3351,7 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu ...@@ -3343,6 +3351,7 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
tVariantAssign(&pse->param[j], &pExpr->base.param[j]); tVariantAssign(&pse->param[j], &pExpr->base.param[j]);
} }
} }
{ {
for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) { for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) {
SExprInfo* pExpr = &pQueryAttr->pExpr1[i]; SExprInfo* pExpr = &pQueryAttr->pExpr1[i];
...@@ -3365,8 +3374,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu ...@@ -3365,8 +3374,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
functionId = TSDB_FUNC_STDDEV; functionId = TSDB_FUNC_STDDEV;
} }
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, &pse->resBytes, &inter,
&pse->resBytes, &inter, 0, false); 0, false);
} }
} }
...@@ -3470,17 +3479,17 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -3470,17 +3479,17 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->colList[i].numOfFilters); pQueryAttr->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->colList[i].numOfFilters);
} }
// global aggregate query
if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
createGlobalAggregateExpr(pQueryAttr, pQueryInfo);
}
// for simple table, not for super table // for simple table, not for super table
int32_t code = createSecondaryExpr(pQueryAttr, pQueryInfo, pTableMetaInfo); int32_t code = createSecondaryExpr(pQueryAttr, pQueryInfo, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// global aggregate query
if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
createGlobalAggregateExpr(pQueryAttr, pQueryInfo);
}
// tag column info // tag column info
code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo); code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -522,6 +522,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara ...@@ -522,6 +522,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf);
int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool isQueryKilled(SQInfo *pQInfo); bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
......
...@@ -913,7 +913,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, ...@@ -913,7 +913,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
// in case of the block distribution query, the inputBytes is not a constant value. // in case of the block distribution query, the inputBytes is not a constant value.
pCtx[i].pInput = p->pData; pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);// && pCtx[i].inputBytes == p->info.bytes); assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);
uint32_t status = aAggs[pCtx[i].functionId].status; uint32_t status = aAggs[pCtx[i].functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
...@@ -3936,6 +3936,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts ...@@ -3936,6 +3936,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts
// create runtime environment // create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables; int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param); code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -5890,24 +5891,24 @@ _cleanup: ...@@ -5890,24 +5891,24 @@ _cleanup:
return code; return code;
} }
static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg) { int32_t buildArithmeticExprFromMsg(SExprInfo *pExprInfo, void *pQueryMsg) {
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg); qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
tExprNode* pExprNode = NULL; tExprNode* pExprNode = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) { TRY(TSDB_MAX_TAG_CONDITIONS) {
pExprNode = exprTreeFromBinary(pArithExprInfo->base.param[0].pz, pArithExprInfo->base.param[0].nLen); pExprNode = exprTreeFromBinary(pExprInfo->base.param[0].pz, pExprInfo->base.param[0].nLen);
} CATCH( code ) { } CATCH( code ) {
CLEANUP_EXECUTE(); CLEANUP_EXECUTE();
qError("qmsg:%p failed to create arithmetic expression string from:%s, reason: %s", pQueryMsg, pArithExprInfo->base.param[0].pz, tstrerror(code)); qError("qmsg:%p failed to create arithmetic expression string from:%s, reason: %s", pQueryMsg, pExprInfo->base.param[0].pz, tstrerror(code));
return code; return code;
} END_TRY } END_TRY
if (pExprNode == NULL) { if (pExprNode == NULL) {
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.param[0].pz); qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExprInfo->base.param[0].pz);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pArithExprInfo->pExpr = pExprNode; pExprInfo->pExpr = pExprNode;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -136,12 +136,6 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { ...@@ -136,12 +136,6 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
int32_t op = OP_MultiwaySort; int32_t op = OP_MultiwaySort;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
// fill operator
if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) {
op = OP_Fill;
taosArrayPush(plan, &op);
}
// arithmetic operator // arithmetic operator
if (!pQueryAttr->simpleAgg && pQueryAttr->interval.interval == 0) { if (!pQueryAttr->simpleAgg && pQueryAttr->interval.interval == 0) {
op = OP_Arithmetic; op = OP_Arithmetic;
...@@ -149,6 +143,17 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { ...@@ -149,6 +143,17 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
} else { } else {
op = OP_GlobalAggregate; op = OP_GlobalAggregate;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
}
// fill operator
if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) {
op = OP_Fill;
taosArrayPush(plan, &op);
} }
// limit/offset operator // limit/offset operator
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册