提交 0271413a 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 61b351a1
...@@ -295,19 +295,8 @@ typedef struct SMultiFunctionsDesc { ...@@ -295,19 +295,8 @@ typedef struct SMultiFunctionsDesc {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable); bool isSuperTable);
/**
* If the given name is a valid built-in sql function, the value of true will be returned.
* @param name
* @param len
* @return
*/
int32_t qIsBuiltinFunction(const char* name, int32_t len, bool* scalarFunction);
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId); bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
bool qIsAggregateFunction(const char* functionName);
bool qIsSelectivityFunction(const char* functionName);
tExprNode* exprTreeFromBinary(const void* data, size_t size); tExprNode* exprTreeFromBinary(const void* data, size_t size);
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc); void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
......
...@@ -328,7 +328,7 @@ typedef struct SOperatorInfo { ...@@ -328,7 +328,7 @@ typedef struct SOperatorInfo {
char* name; // name, used to show the query execution plan char* name; // name, used to show the query execution plan
void* info; // extension attribution void* info; // extension attribution
SExprInfo* pExpr; SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost; SOperatorCostInfo cost;
...@@ -515,7 +515,9 @@ typedef struct SAggOperatorInfo { ...@@ -515,7 +515,9 @@ typedef struct SAggOperatorInfo {
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SSDataBlock* existDataBlock; SSDataBlock *existDataBlock;
int32_t threshold;
bool hasVarCol;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SLimitOperatorInfo { typedef struct SLimitOperatorInfo {
...@@ -646,16 +648,17 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock ...@@ -646,16 +648,17 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
......
...@@ -1235,26 +1235,15 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction ...@@ -1235,26 +1235,15 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
} }
} }
static void projectApplyFunctions(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx, int32_t numOfOutput) { static void projectApplyFunctions(SqlFunctionCtx *pCtx, int32_t numOfOutput) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pQueryAttr->window.skey;
// Always set the asc order for merge stage process // Always set the asc order for merge stage process
if (pCtx[k].currentStage == MERGE_STAGE) { if (pCtx[k].currentStage == MERGE_STAGE) {
pCtx[k].order = TSDB_ORDER_ASC; pCtx[k].order = TSDB_ORDER_ASC;
} }
// pCtx[k].fpSet.process(&pCtx[k]);
pCtx[k].startTs = pQueryAttr->window.skey;
if (pCtx[k].functionId < 0) {
// load the script and exec
// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
// } else {
// aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); // aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
} // }
} }
} }
...@@ -2040,9 +2029,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI ...@@ -2040,9 +2029,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI
return pFuncCtx; return pFuncCtx;
} }
static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset) { static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx)); SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pFuncCtx == NULL) { if (pFuncCtx == NULL) {
return NULL; return NULL;
...@@ -2055,7 +2042,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC ...@@ -2055,7 +2042,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
} }
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); SExprInfo* pExpr = &pExprInfo[i];
SExprBasicInfo *pFunct = &pExpr->base; SExprBasicInfo *pFunct = &pExpr->base;
SqlFunctionCtx* pCtx = &pFuncCtx[i]; SqlFunctionCtx* pCtx = &pFuncCtx[i];
...@@ -3411,11 +3398,11 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput) ...@@ -3411,11 +3398,11 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput)
int32_t functionId = pCtx[i].functionId; int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) { if (functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) {
needCopyTs = true; needCopyTs = true;
if (i > 0 && pCtx[i-1].functionId == FUNCTION_TS_DUMMY){ if (i > 0 && pCtx[i-1].functionId == FUNCTION_TS_DUMMY) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data
src = pColRes->pData; src = pColRes->pData;
} }
}else if(functionId == FUNCTION_TS_DUMMY) { } else if(functionId == FUNCTION_TS_DUMMY) {
tsNum++; tsNum++;
} }
} }
...@@ -6011,7 +5998,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -6011,7 +5998,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) {
return doMerge(pOperator); return doMerge(pOperator);
} }
static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { static SArray* createBlockOrder(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal) {
SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
size_t numOfOrder = taosArrayGetSize(pOrderVal); size_t numOfOrder = taosArrayGetSize(pOrderVal);
...@@ -6020,8 +6007,8 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { ...@@ -6020,8 +6007,8 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
SOrder* pOrder = taosArrayGet(pOrderVal, j); SOrder* pOrder = taosArrayGet(pOrderVal, j);
orderInfo.order = pOrder->order; orderInfo.order = pOrder->order;
for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = taosArrayGet(pExprInfo, i); SExprInfo* pExpr = &pExprInfo[i];
if (pExpr->base.resSchema.colId == pOrder->col.colId) { if (pExpr->base.resSchema.colId == pOrder->col.colId) {
orderInfo.colIndex = i; orderInfo.colIndex = i;
break; break;
...@@ -6034,7 +6021,7 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { ...@@ -6034,7 +6021,7 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
return pOrderInfo; return pOrderInfo;
} }
static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) { static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) {
if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) { if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
return 0; return 0;
} }
...@@ -6050,8 +6037,8 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO ...@@ -6050,8 +6037,8 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO
size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo); size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
for(int32_t i = 0; i < numOfGroupCol; ++i) { for(int32_t i = 0; i < numOfGroupCol; ++i) {
SColumn* pCol = taosArrayGet(pGroupInfo, i); SColumn* pCol = taosArrayGet(pGroupInfo, i);
for(int32_t j = 0; j < taosArrayGetSize(pExprInfo); ++j) { for(int32_t j = 0; j < numOfCols; ++j) {
SExprInfo* pe = taosArrayGet(pExprInfo, j); SExprInfo* pe = &pExprInfo[j];
if (pe->base.resSchema.colId == pCol->colId) { if (pe->base.resSchema.colId == pCol->colId) {
taosArrayPush(plist, pCol); taosArrayPush(plist, pCol);
taosArrayPush(pInfo->groupInfo, &j); taosArrayPush(pInfo->groupInfo, &j);
...@@ -6082,29 +6069,27 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO ...@@ -6082,29 +6069,27 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) {
SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo)); SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
int32_t numOfOutput = taosArrayGetSize(pExprInfo); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) { if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) {
goto _error; goto _error;
} }
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput); int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
code = initGroupCol(pExprInfo, pGroupInfo, pInfo); code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -6113,7 +6098,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t ...@@ -6113,7 +6098,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
// pRuntimeEnv->pQueryAttr->topBotQuery, false)); // pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo->sortBufSize = 1024 * 16; // 1MB pInfo->sortBufSize = 1024 * 16; // 1MB
pInfo->bufPageSize = 1024; pInfo->bufPageSize = 1024;
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); pInfo->orderInfo = createBlockOrder(pExprInfo, num, pOrderVal);
pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
...@@ -6122,12 +6107,12 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t ...@@ -6122,12 +6107,12 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = num;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSortedMerge; pOperator->getNextFn = doSortedMerge;
pOperator->closeFn = destroySortedMergeOperatorInfo; pOperator->closeFn = destroySortedMergeOperatorInfo;
code = appendDownstream(pOperator, downstream, numOfDownstream); code = appendDownstream(pOperator, downstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -6138,7 +6123,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t ...@@ -6138,7 +6123,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroySortedMergeOperatorInfo(pInfo, numOfOutput); destroySortedMergeOperatorInfo(pInfo, num);
} }
tfree(pInfo); tfree(pInfo);
...@@ -6180,7 +6165,7 @@ static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -6180,7 +6165,7 @@ static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes); return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
} }
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) { SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -6194,12 +6179,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI ...@@ -6194,12 +6179,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pInfo->bufPageSize = 1024; pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024; pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); pInfo->orderInfo = createBlockOrder(pExprInfo, numOfCols, pOrderVal);
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); if (IS_VAR_DATA_TYPE(pExprInfo[i].base.resSchema.type)) {
if (IS_VAR_DATA_TYPE(pExpr->base.resSchema.type)) {
pInfo->hasVarCol = true; pInfo->hasVarCol = true;
break; break;
} }
...@@ -6207,7 +6190,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI ...@@ -6207,7 +6190,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) { if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) {
tfree(pOperator); tfree(pOperator);
destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo)); destroyOrderOperatorInfo(pInfo, numOfCols);
tfree(pInfo); tfree(pInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
...@@ -6359,51 +6342,48 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgro ...@@ -6359,51 +6342,48 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgro
static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) {
SProjectOperatorInfo* pProjectInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
int32_t order = pRuntimeEnv->pQueryAttr->order.order; blockDataClearup(pRes, pProjectInfo->hasVarCol);
pRes->info.rows = 0;
if (pProjectInfo->existDataBlock) { // TODO refactor if (pProjectInfo->existDataBlock) { // TODO refactor
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SSDataBlock* pBlock = pProjectInfo->existDataBlock; SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pProjectInfo->existDataBlock = NULL; pProjectInfo->existDataBlock = NULL;
*newgroup = true; *newgroup = true;
// todo dynamic set tags // todo dynamic set tags
if (pTableQueryInfo != NULL) { // if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
} // }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pInfo->pCtx, pOperator->numOfOutput);
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pProjectInfo->threshold) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
return pRes; return pRes;
} }
} }
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
bool prevVal = *newgroup; bool prevVal = *newgroup;
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
assert(*newgroup == false); assert(*newgroup == false);
*newgroup = prevVal; *newgroup = prevVal;
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break; break;
...@@ -6422,25 +6402,26 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6422,25 +6402,26 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
} }
} }
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// todo dynamic set tags // todo dynamic set tags
if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
} // if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pInfo->pCtx, pOperator->numOfOutput);
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); // pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pRes);
if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) { if (pRes->info.rows >= pProjectInfo->threshold) {
break; break;
} }
} }
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); // resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
...@@ -7169,12 +7150,12 @@ static void clearupAggSup(SAggSupporter* pAggSup) { ...@@ -7169,12 +7150,12 @@ static void clearupAggSup(SAggSupporter* pAggSup) {
destroyResultRowPool(pAggSup->pool); destroyResultRowPool(pAggSup->pool);
} }
static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows, SSDataBlock* pResultBlock, const STableGroupInfo* pTableGroupInfo) { static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, const STableGroupInfo* pTableGroupInfo) {
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = pResultBlock; pInfo->binfo.pRes = pResultBlock;
pInfo->binfo.capacity = numOfRows; pInfo->binfo.capacity = numOfRows;
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, taosArrayGetSize(pExprInfo)); doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
int32_t index = 0; int32_t index = 0;
...@@ -7196,14 +7177,14 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n ...@@ -7196,14 +7177,14 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1; int32_t numOfRows = 1;
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
initAggInfo(pInfo, pExprInfo, numOfRows, pResultBlock, pTableGroupInfo); initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResultBlock, pTableGroupInfo);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -7212,8 +7193,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE ...@@ -7212,8 +7193,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = doOpenAggregateOptr; pOperator->_openFn = doOpenAggregateOptr;
...@@ -7317,12 +7298,11 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -7317,12 +7298,11 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
tsem_destroy(&pExInfo->ready); tsem_destroy(&pExInfo->ready);
} }
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1; int32_t numOfRows = 1;
size_t numOfOutput = taosArrayGetSize(pExprInfo); initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResBlock, pTableGroupInfo);
initAggInfo(pInfo, pExprInfo, numOfRows, pResBlock, pTableGroupInfo);
size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList); size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup);
...@@ -7333,8 +7313,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray ...@@ -7333,8 +7313,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doMultiTableAggregate; pOperator->getNextFn = doMultiTableAggregate;
...@@ -7344,12 +7324,11 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray ...@@ -7344,12 +7324,11 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
return pOperator; return pOperator;
} }
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
int32_t numOfRows = 4096; pInfo->binfo.pRes = pResBlock;
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
// initResultRowInfo(&pBInfo->resultRowInfo, 8); // initResultRowInfo(&pBInfo->resultRowInfo, 8);
// setFunctionResultOutput(pBInfo, MAIN_SCAN); // setFunctionResultOutput(pBInfo, MAIN_SCAN);
...@@ -7360,11 +7339,14 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp ...@@ -7360,11 +7339,14 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = num;
pOperator->getNextFn = doProjectOperation; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doProjectOperation;
pOperator->closeFn = destroyProjectOperatorInfo; pOperator->closeFn = destroyProjectOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -7421,11 +7403,10 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -7421,11 +7403,10 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator; return pOperator;
} }
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
size_t numOfOutput = taosArrayGetSize(pExprInfo); doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput);
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->precision = TSDB_TIME_PRECISION_MICRO; pInfo->precision = TSDB_TIME_PRECISION_MICRO;
...@@ -7434,8 +7415,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx ...@@ -7434,8 +7415,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
...@@ -7445,10 +7425,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx ...@@ -7445,10 +7425,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx
// pOperator->operatorType = OP_TimeWindow; // pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->getNextFn = doIntervalAgg; pOperator->getNextFn = doIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
...@@ -8104,12 +8084,14 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i ...@@ -8104,12 +8084,14 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
return s; return s;
} }
SArray* createExprInfo(SAggPhysiNode* pPhyNode) { SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) {
int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs); int32_t numOfFuncs = LIST_LENGTH(pNodeList);
*numOfExprs = numOfFuncs;
SExprInfo* pExprs = calloc(numOfFuncs, sizeof(SExprInfo));
SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES); for(int32_t i = 0; i < numOfFuncs; ++i) {
for(int32_t i = 0; i < numOfAggFuncs; ++i) { SExprInfo* pExp = &pExprs[i];
SExprInfo* pExp = calloc(1, sizeof(SExprInfo));
pExp->pExpr = calloc(1, sizeof(tExprNode)); pExp->pExpr = calloc(1, sizeof(tExprNode));
pExp->pExpr->_function.num = 1; pExp->pExpr->_function.num = 1;
...@@ -8120,7 +8102,7 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) { ...@@ -8120,7 +8102,7 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) {
pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn)); pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn));
SColumn* pCol = pExp->base.pParam[0].pCol; SColumn* pCol = pExp->base.pParam[0].pCol;
STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pPhyNode->pAggFuncs, i); STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pNodeList, i);
ASSERT(pTargetNode->slotId == i); ASSERT(pTargetNode->slotId == i);
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
...@@ -8145,10 +8127,9 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) { ...@@ -8145,10 +8127,9 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) {
pCol->precision = pcn->node.resType.precision; pCol->precision = pcn->node.resType.precision;
pCol->dataBlockId = pcn->dataBlockId; pCol->dataBlockId = pcn->dataBlockId;
} }
taosArrayPush(pArray, &pExp);
} }
return pArray; return pExprs;
} }
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
...@@ -8208,7 +8189,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa ...@@ -8208,7 +8189,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
} }
} }
if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1); assert(size == 1);
...@@ -8216,9 +8210,10 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa ...@@ -8216,9 +8210,10 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode); int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SAggPhysiNode*)pPhyNode)->pAggFuncs, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
} }
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren); size_t size = taosArrayGetSize(pPhyNode->pChildren);
......
...@@ -55,8 +55,7 @@ typedef struct SDummyInputInfo { ...@@ -55,8 +55,7 @@ typedef struct SDummyInputInfo {
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SDummyInputInfo; } SDummyInputInfo;
SSDataBlock* getDummyBlock(void* param, bool* newgroup) { SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info); SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) { if (pInfo->current >= pInfo->totalPages) {
return NULL; return NULL;
...@@ -122,8 +121,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { ...@@ -122,8 +121,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
return pBlock; return pBlock;
} }
SSDataBlock* get2ColsDummyBlock(void* param, bool* newgroup) { SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(param);
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info); SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) { if (pInfo->current >= pInfo->totalPages) {
return NULL; return NULL;
......
...@@ -46,13 +46,6 @@ extern SAggFunctionInfo aggFunc[35]; ...@@ -46,13 +46,6 @@ extern SAggFunctionInfo aggFunc[35];
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value #define DATA_SET_FLAG ',' // to denote the output area has data, not null value
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG) #define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
#define TOP_BOTTOM_QUERY_LIMIT 100
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_QUERY))
#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0)
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
typedef struct SInterpInfoDetail { typedef struct SInterpInfoDetail {
TSKEY ts; // interp specified timestamp TSKEY ts; // interp specified timestamp
int8_t type; int8_t type;
...@@ -61,9 +54,6 @@ typedef struct SInterpInfoDetail { ...@@ -61,9 +54,6 @@ typedef struct SInterpInfoDetail {
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo))) #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
typedef struct STwaInfo { typedef struct STwaInfo {
int8_t hasResult; // flag to denote has value int8_t hasResult; // flag to denote has value
double dOutput; double dOutput;
...@@ -71,8 +61,6 @@ typedef struct STwaInfo { ...@@ -71,8 +61,6 @@ typedef struct STwaInfo {
STimeWindow win; STimeWindow win;
} STwaInfo; } STwaInfo;
extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval); bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
/** /**
......
#include "os.h"
#include "tarray.h"
#include "function.h"
#include "thash.h"
#include "taggfunction.h"
static SHashObj* functionHashTable = NULL;
static SHashObj* udfHashTable = NULL;
static void doInitFunctionHashTable() {
int numOfEntries = tListLen(aggFunc);
functionHashTable = taosHashInit(numOfEntries, MurmurHash3_32, false, false);
for (int32_t i = 0; i < numOfEntries; i++) {
int32_t len = (uint32_t)strlen(aggFunc[i].name);
SAggFunctionInfo* ptr = &aggFunc[i];
taosHashPut(functionHashTable, aggFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
}
/*
numOfEntries = tListLen(scalarFunc);
for(int32_t i = 0; i < numOfEntries; ++i) {
int32_t len = (int32_t) strlen(scalarFunc[i].name);
SScalarFunctionInfo* ptr = &scalarFunc[i];
taosHashPut(functionHashTable, scalarFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
}
*/
udfHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, true);
}
static pthread_once_t functionHashTableInit = PTHREAD_ONCE_INIT;
int32_t qIsBuiltinFunction(const char* name, int32_t len, bool* scalarFunction) {
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, name, len);
if (pInfo != NULL) {
*scalarFunction = ((*pInfo)->type == FUNCTION_TYPE_SCALAR);
return (*pInfo)->functionId;
} else {
return -1;
}
}
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId) {
return true;
}
bool qIsAggregateFunction(const char* functionName) {
assert(functionName != NULL);
bool scalarfunc = false;
qIsBuiltinFunction(functionName, strlen(functionName), &scalarfunc);
return !scalarfunc;
}
bool qIsSelectivityFunction(const char* functionName) {
assert(functionName != NULL);
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
size_t len = strlen(functionName);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, functionName, len);
if (pInfo != NULL) {
return ((*pInfo)->status | FUNCSTATE_SELECTIVITY) != 0;
}
return false;
}
SAggFunctionInfo* qGetFunctionInfo(const char* name, int32_t len) {
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, name, len);
if (pInfo != NULL) {
return (*pInfo);
} else {
return NULL;
}
}
void qAddUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) {
int32_t len = (uint32_t)strlen(pUdfInfo->name);
taosHashPut(udfHashTable, pUdfInfo->name, len, (void*)&pUdfInfo, POINTER_BYTES);
}
void qRemoveUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) {
int32_t len = (uint32_t)strlen(pUdfInfo->name);
taosHashRemove(udfHashTable, pUdfInfo->name, len);
}
bool isTagsQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
// todo handle count(tbname) query
if (strcmp(f, "project") != 0 && strcmp(f, "count") != 0) {
return false;
}
// "select count(tbname)" query
// if (functId == FUNCTION_COUNT && pExpr->base.colpDesc->colId == TSDB_TBNAME_COLUMN_INDEX) {
// continue;
// }
}
return true;
}
//bool tscMultiRoundQuery(SArray* pFunctionIdList, int32_t index) {
// if (!UTIL_TABLE_IS_SUPER_TABLE(pQueryInfo->pTableMetaInfo[index])) {
// return false;
// }
//
// size_t numOfExprs = (int32_t) getNumOfExprs(pQueryInfo);
// for(int32_t i = 0; i < numOfExprs; ++i) {
// SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
// if (pExpr->base.functionId == FUNCTION_STDDEV_DST) {
// return true;
// }
// }
//
// return false;
//}
bool isProjectionQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
return true;
}
}
return false;
}
bool isDiffDerivativeQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TS_DUMMY) {
continue;
}
if (f == FUNCTION_DIFF || f == FUNCTION_DERIVATIVE) {
return true;
}
}
return false;
}
bool isInterpQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TAG || f == FUNCTION_TS) {
continue;
}
if (f != FUNCTION_INTERP) {
return false;
}
}
return true;
}
bool isArithmeticQueryOnAggResult(SArray* pFunctionIdList) {
if (isProjectionQuery(pFunctionIdList)) {
return false;
}
assert(0);
// size_t numOfOutput = getNumOfFields(pQueryInfo);
// for(int32_t i = 0; i < numOfOutput; ++i) {
// SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pExpr;
// if (pExprInfo->pExpr != NULL) {
// return true;
// }
// }
return false;
}
bool isGroupbyColumn(SGroupbyExpr* pGroupby) {
return !pGroupby->groupbyTag;
}
bool isTopBotQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
continue;
}
if (strcmp(f, "top") == 0 || strcmp(f, "bottom") == 0) {
return true;
}
}
return false;
}
bool isTsCompQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
if (num != 1) {
return false;
}
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, 0);
return f == FUNCTION_TS_COMP;
}
bool isTWAQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TWA) {
return true;
}
}
return false;
}
bool isIrateQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_IRATE) {
return true;
}
}
return false;
}
bool isStabledev(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_STDDEV_DST) {
return true;
}
}
return false;
}
bool needReverseScan(SArray* pFunctionIdList) {
assert(0);
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {
int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
if (f == FUNCTION_TS || f == FUNCTION_TS_DUMMY || f == FUNCTION_TAG) {
continue;
}
// if ((f == FUNCTION_FIRST || f == FUNCTION_FIRST_DST) && pQueryInfo->order.order == TSDB_ORDER_DESC) {
// return true;
// }
if (f == FUNCTION_LAST || f == FUNCTION_LAST_DST) {
// the scan order to acquire the last result of the specified column
// int32_t order = (int32_t)pExpr->base.param[0].i64;
// if (order != pQueryInfo->order.order) {
// return true;
// }
}
}
return false;
}
bool isAgg(SArray* pFunctionIdList) {
size_t size = taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < size; ++i) {
char* f = *(char**) taosArrayGet(pFunctionIdList, i);
if (strcmp(f, "project") == 0) {
return false;
}
if (qIsAggregateFunction(f)) {
return true;
}
}
return false;
}
bool isBlockDistQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
char* f = *(char**) taosArrayGet(pFunctionIdList, 0);
return (num == 1 && strcmp(f, "block_dist") == 0);
}
bool isTwoStageSTableQuery(SArray* pFunctionIdList, int32_t tableIndex) {
// if (pQueryInfo == NULL) {
// return false;
// }
//
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
// if (pTableMetaInfo == NULL) {
// return false;
// }
//
// if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
// return false;
// }
//
// // for ordered projection query, iterate all qualified vnodes sequentially
// if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
// return false;
// }
//
// if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->command == TSDB_SQL_SELECT) {
// return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
// }
return false;
}
bool isProjectionQueryOnSTable(SArray* pFunctionIdList, int32_t tableIndex) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
//
// /*
// * In following cases, return false for non ordered project query on super table
// * 1. failed to get tableMeta from server; 2. not a super table; 3. limitation is 0;
// * 4. show queries, instead of a select query
// */
// size_t numOfExprs = getNumOfExprs(pQueryInfo);
// if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) ||
// pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || numOfExprs == 0) {
// return false;
// }
//
// for (int32_t i = 0; i < numOfExprs; ++i) {
// int32_t functionId = getExprInfo(pQueryInfo, i)->base.functionId;
//
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
// if (pUdfInfo->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
// return false;
// }
//
// continue;
// }
//
// if (functionId != FUNCTION_PRJ &&
// functionId != FUNCTION_TAGPRJ &&
// functionId != FUNCTION_TAG &&
// functionId != FUNCTION_TS &&
// functionId != FUNCTION_ARITHM &&
// functionId != FUNCTION_TS_COMP &&
// functionId != FUNCTION_DIFF &&
// functionId != FUNCTION_DERIVATIVE &&
// functionId != FUNCTION_TS_DUMMY &&
// functionId != FUNCTION_TID_TAG) {
// return false;
// }
// }
return true;
}
bool hasTagValOutput(SArray* pFunctionIdList) {
size_t size = taosArrayGetSize(pFunctionIdList);
// if (numOfExprs == 1 && pExpr1->base.functionId == FUNCTION_TS_COMP) {
// return true;
// }
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = *(int16_t*) taosArrayGet(pFunctionIdList, i);
// ts_comp column required the tag value for join filter
if (functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
return true;
}
}
return false;
}
//bool timeWindowInterpoRequired(SArray* pFunctionIdList) {
// int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
// for (int32_t i = 0; i < num; ++i) {
// int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i);
// if (f == FUNCTION_TWA || f == FUNCTION_INTERP) {
// return true;
// }
// }
//
// return false;
//}
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) {
assert(pFunctionIdList != NULL);
pDesc->blockDistribution = isBlockDistQuery(pFunctionIdList);
if (pDesc->blockDistribution) {
return;
}
// pDesc->projectionQuery = isProjectionQuery(pFunctionIdList);
// pDesc->onlyTagQuery = isTagsQuery(pFunctionIdList);
pDesc->interpQuery = isInterpQuery(pFunctionIdList);
pDesc->topbotQuery = isTopBotQuery(pFunctionIdList);
pDesc->agg = isAgg(pFunctionIdList);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册