提交 266b878b 编写于 作者: H Haojun Liao

[td-3299]

上级 c36a71e8
......@@ -514,7 +514,8 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx
int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -48,6 +48,7 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, in
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
......
......@@ -167,6 +167,8 @@ static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, S
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static bool hasMainOutput(SQueryAttr *pQueryAttr);
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, STableQueryInfo *pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
......@@ -1765,12 +1767,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case OP_Filter: { // todo refactor
if (pQueryAttr->stableQuery) {
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3);
int32_t numOfFilterCols = 0;
if (pQueryAttr->numOfFilterCols > 0) {
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols);
} else {
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pQueryAttr->stableQuery) {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
} else {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
}
}
break;
}
......@@ -5409,38 +5424,37 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator;
}
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput) {
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo));
{
SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo));
int32_t numOfFilter = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
if (pExpr[i].base.flist.numOfFilters > 0) {
numOfFilter += 1;
}
int32_t numOfFilter = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
if (pExpr[i].base.flist.numOfFilters > 0) {
numOfFilter += 1;
}
pCols[i].type = pExpr[i].base.resType;
pCols[i].bytes = pExpr[i].base.resBytes;
pCols[i].colId = pExpr[i].base.resColId;
pCols[i].type = pExpr[i].base.resType;
pCols[i].bytes = pExpr[i].base.resBytes;
pCols[i].colId = pExpr[i].base.resColId;
pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters;
pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo));
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo));
}
pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters;
pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo));
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo));
}
assert(numOfFilter > 0);
assert(numOfFilter > 0);
doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0);
pInfo->numOfFilterCols = numOfFilter;
*numOfFilterCols = numOfFilter;
return pCols;
}
for(int32_t i = 0; i < numOfOutput; ++i) {
tfree(pCols[i].flist.filterInfo);
}
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) {
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
tfree(pCols);
}
assert(numOfFilter > 0 && pCols != NULL);
doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0);
pInfo->numOfFilterCols = numOfFilter;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -7097,6 +7111,20 @@ void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) {
return NULL;
}
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
if (pColumnInfo != NULL) {
assert(numOfCols > 0);
for (int32_t i = 0; i < numOfCols; i++) {
freeColumnFilterInfo(pColumnInfo[i].flist.filterInfo, pColumnInfo[i].flist.numOfFilters);
}
tfree(pColumnInfo);
}
return NULL;
}
void freeQInfo(SQInfo *pQInfo) {
if (!isValidQInfo(pQInfo)) {
return;
......@@ -7281,13 +7309,7 @@ void freeQueryAttr(SQueryAttr* pQueryAttr) {
tfree(pQueryAttr->tagColList);
tfree(pQueryAttr->pFilterInfo);
if (pQueryAttr->tableCols != NULL) {
for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) {
SColumnInfo* column = pQueryAttr->tableCols + i;
freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters);
}
tfree(pQueryAttr->tableCols);
}
pQueryAttr->tableCols = freeColumnInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols);
if (pQueryAttr->pGroupbyExpr != NULL) {
taosArrayDestroy(pQueryAttr->pGroupbyExpr->columnInfo);
......
......@@ -615,8 +615,13 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
}
} else { // diff/add/multiply/subtract/division
op = OP_Arithmetic;
taosArrayPush(plan, &op);
if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->vgId == 0) { // todo refactor
op = OP_Filter;
taosArrayPush(plan, &op);
} else {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
}
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册