提交 bdf03d8f 编写于 作者: H Haojun Liao

[td-11818] refactor.

上级 b45da2b2
...@@ -165,6 +165,7 @@ typedef struct SQLFunctionCtx { ...@@ -165,6 +165,7 @@ typedef struct SQLFunctionCtx {
SPoint1 start; SPoint1 start;
SPoint1 end; SPoint1 end;
int32_t columnIndex;
SFunctionFpSet* fpSet; SFunctionFpSet* fpSet;
} SQLFunctionCtx; } SQLFunctionCtx;
......
...@@ -414,6 +414,10 @@ typedef struct STagScanInfo { ...@@ -414,6 +414,10 @@ typedef struct STagScanInfo {
int32_t curPos; int32_t curPos;
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo {
} SStreamBlockScanInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
int32_t *rowCellInfoOffset; // offset value for each row result cell info int32_t *rowCellInfoOffset; // offset value for each row result cell info
...@@ -554,13 +558,12 @@ typedef struct SOrderOperatorInfo { ...@@ -554,13 +558,12 @@ typedef struct SOrderOperatorInfo {
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
} SOrderOperatorInfo; } SOrderOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
......
...@@ -73,15 +73,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ ...@@ -73,15 +73,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
assert(tsdb != NULL && pSubplan != NULL); assert(tsdb != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
int32_t code = 0; int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
uint64_t uid = 0;
STimeWindow window = TSWINDOW_INITIALIZER;
int32_t tableType = 0;
SPhyNode* pPhyNode = pSubplan->pNode;
// STableGroupInfo groupInfo = {0};
code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -12,11 +12,12 @@ ...@@ -12,11 +12,12 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <parser.h>
#include "exception.h"
#include "os.h" #include "os.h"
#include "tmsg.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h"
#include "ttime.h" #include "ttime.h"
#include "exception.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "function.h" #include "function.h"
...@@ -176,7 +177,7 @@ static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult ...@@ -176,7 +177,7 @@ static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult
void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx); static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static bool hasMainOutput(STaskAttr *pQueryAttr); static bool hasMainOutput(STaskAttr *pQueryAttr);
...@@ -320,7 +321,7 @@ SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) { ...@@ -320,7 +321,7 @@ SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {{0}}; SColumnInfoData idata = {{0}};
SExprInfo* pExpr = taosArrayGet(pExprInfo, i); SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
idata.info.type = pExpr->base.resSchema.type; idata.info.type = pExpr->base.resSchema.type;
idata.info.bytes = pExpr->base.resSchema.bytes; idata.info.bytes = pExpr->base.resSchema.bytes;
...@@ -382,8 +383,8 @@ static bool isProjQuery(STaskAttr *pQueryAttr) { ...@@ -382,8 +383,8 @@ static bool isProjQuery(STaskAttr *pQueryAttr) {
return true; return true;
} }
static bool hasNull(SColIndex* pColIndex, SColumnDataAgg *pStatis) { static bool hasNull(SColumn* pColumn, SColumnDataAgg *pStatis) {
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
return false; return false;
} }
...@@ -1122,43 +1123,52 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pC ...@@ -1122,43 +1123,52 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pC
} }
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
if (pCtx[0].functionId == FUNCTION_ARITHM) { // if (pCtx[0].functionId == FUNCTION_ARITHM) {
// SScalar* pSupport = (SScalarFunctionSupport*) pCtx[0].param[1].pz; // SScalar* pSupport = (SScalarFunctionSupport*) pCtx[0].param[1].pz;
// if (pSupport->colList == NULL) { // if (pSupport->colList == NULL) {
// doSetInputDataBlock(pOperator, pCtx, pBlock, order); // doSetInputDataBlock(pOperator, pCtx, pBlock, order);
// } else { // } else {
// doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); // doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
// } // }
} else { // } else {
if (pBlock->pDataBlock != NULL) { if (pBlock->pDataBlock != NULL) {
doSetInputDataBlock(pOperator, pCtx, pBlock, order); doSetInputDataBlock(pOperator, pCtx, pBlock, order);
} else { } else {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
} }
} // }
} }
static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
#if 0
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows; pCtx[i].size = pBlock->info.rows;
pCtx[i].currentStage = (uint8_t)pOperator->pRuntimeEnv->scanFlag; pCtx[i].currentStage = MAIN_SCAN/*(uint8_t)pOperator->pRuntimeEnv->scanFlag*/;
setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
if (pCtx[i].functionId == FUNCTION_ARITHM) { if (pCtx[i].functionId == FUNCTION_ARITHM) {
// setArithParams((SScalarFunctionSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); // setArithParams((SScalarFunctionSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else { } else {
SColIndex* pCol = &pOperator->pExpr[i].base.pColumns->info.; uint32_t flag = pOperator->pExpr[i].base.pColumns->flag;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCtx[i].functionId == FUNCTION_BLKINFO) || if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
(TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); SColumn* pCol = pOperator->pExpr[i].base.pColumns;
if (pCtx[i].columnIndex == -1) {
for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
if (pColData->info.colId == pCol->info.colId) {
pCtx[i].columnIndex = j;
break;
}
}
}
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pCtx[i].columnIndex);
// in case of the block distribution query, the inputBytes is not a constant value. // in case of the block distribution query, the inputBytes is not a constant value.
pCtx[i].pInput = p->pData; pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); assert(p->info.colId == pCol->info.colId);
if (pCtx[i].functionId < 0) { if (pCtx[i].functionId < 0) {
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
...@@ -1169,30 +1179,28 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, ...@@ -1169,30 +1179,28 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
// uint32_t status = aAggs[pCtx[i].functionId].status; // uint32_t status = aAggs[pCtx[i].functionId].status;
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
// // In case of the top/bottom query again the nest query result, which has no timestamp column // In case of the top/bottom query again the nest query result, which has no timestamp column
// // don't set the ptsList attribute. // don't set the ptsList attribute.
// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
// pCtx[i].ptsList = (int64_t*) tsInfo->pData; pCtx[i].ptsList = (int64_t*) tsInfo->pData;
// } else { } else {
// pCtx[i].ptsList = NULL; pCtx[i].ptsList = NULL;
// } }
// }
// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
// SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
//
// pCtx[i].pInput = p->pData;
// assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
// for(int32_t j = 0; j < pBlock->info.rows; ++j) {
// char* dst = p->pData + j * p->info.bytes;
// taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
// } // }
} else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
for(int32_t j = 0; j < pBlock->info.rows; ++j) {
char* dst = p->pData + j * p->info.bytes;
taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
}
} }
} }
} }
#endif
} }
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
...@@ -1837,19 +1845,19 @@ static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * ...@@ -1837,19 +1845,19 @@ static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *
} }
// in the reverse table scan, only the following functions need to be executed // in the reverse table scan, only the following functions need to be executed
if (IS_REVERSE_SCAN(pRuntimeEnv) || // if (IS_REVERSE_SCAN(pRuntimeEnv) ||
(pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) { // (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) {
return false; // return false;
} // }
return true; return true;
} }
void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex) { void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn) {
SColumnDataAgg *pAgg = NULL; SColumnDataAgg *pAgg = NULL;
if (pSDataBlock->pBlockAgg != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) { if (pSDataBlock->pBlockAgg != NULL && TSDB_COL_IS_NORMAL_COL(pColumn->flag)) {
pAgg = &pSDataBlock->pBlockAgg[pColIndex->colIndex]; pAgg = &pSDataBlock->pBlockAgg[pCtx->columnIndex];
pCtx->agg = *pAgg; pCtx->agg = *pAgg;
pCtx->isAggSet = true; pCtx->isAggSet = true;
...@@ -1858,10 +1866,10 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde ...@@ -1858,10 +1866,10 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde
pCtx->isAggSet = false; pCtx->isAggSet = false;
} }
pCtx->hasNull = hasNull(pColIndex, pAgg); pCtx->hasNull = hasNull(pColumn, pAgg);
// set the statistics data for primary time stamp column // set the statistics data for primary time stamp column
if (pCtx->functionId == FUNCTION_SPREAD && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (pCtx->functionId == FUNCTION_SPREAD && pColumn->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pCtx->isAggSet = true; pCtx->isAggSet = true;
pCtx->agg.min = pSDataBlock->info.window.skey; pCtx->agg.min = pSDataBlock->info.window.skey;
pCtx->agg.max = pSDataBlock->info.window.ekey; pCtx->agg.max = pSDataBlock->info.window.ekey;
...@@ -2033,6 +2041,7 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC ...@@ -2033,6 +2041,7 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
SSqlExpr *pSqlExpr = &pExpr->base; SSqlExpr *pSqlExpr = &pExpr->base;
SQLFunctionCtx* pCtx = &pFuncCtx[i]; SQLFunctionCtx* pCtx = &pFuncCtx[i];
#if 0 #if 0
SColIndex *pIndex = &pSqlExpr->colInfo; SColIndex *pIndex = &pSqlExpr->colInfo;
...@@ -2043,16 +2052,16 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC ...@@ -2043,16 +2052,16 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
pCtx->requireNull = false; pCtx->requireNull = false;
} }
#endif #endif
// pCtx->inputBytes = pSqlExpr->colBytes; // pCtx->inputBytes = pSqlExpr->;
// pCtx->inputType = pSqlExpr->colType; // pCtx->inputType = pSqlExpr->colType;
pCtx->ptsOutputBuf = NULL; pCtx->ptsOutputBuf = NULL;
pCtx->fpSet = fpSet; pCtx->fpSet = fpSet;
pCtx->columnIndex = -1;
pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes; pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes;
pCtx->resDataInfo.type = pSqlExpr->resSchema.type; pCtx->resDataInfo.type = pSqlExpr->resSchema.type;
// pCtx->order = pQueryAttr->order.order; pCtx->order = TSDB_ORDER_ASC;
// pCtx->functionId = pSqlExpr->functionId; // pCtx->functionId = pSqlExpr->functionId;
// pCtx->stableQuery = pQueryAttr->stableQuery; // pCtx->stableQuery = pQueryAttr->stableQuery;
pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes;
...@@ -2108,12 +2117,12 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC ...@@ -2108,12 +2117,12 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
} }
} }
// for(int32_t i = 1; i < numOfOutput; ++i) { for(int32_t i = 1; i < numOfOutput; ++i) {
// (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr[i - 1].base.interBytes); SExprInfo* pExpr = taosArrayGetP(pExprInfo, i - 1);
// } (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr->base.interBytes);
}
setCtxTagColumnInfo(pFuncCtx, numOfOutput); setCtxTagColumnInfo(pFuncCtx, numOfOutput);
return pFuncCtx; return pFuncCtx;
} }
...@@ -3717,49 +3726,49 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SQLFunctionCt ...@@ -3717,49 +3726,49 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SQLFunctionCt
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperator->numOfOutput; int32_t numOfOutput = pOperator->numOfOutput;
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { // if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
// for each group result, call the finalize function for each column // // for each group result, call the finalize function for each column
if (pQueryAttr->groupbyColumn) { // if (pQueryAttr->groupbyColumn) {
closeAllResultRows(pResultRowInfo); // closeAllResultRows(pResultRowInfo);
} // }
//
for (int32_t i = 0; i < pResultRowInfo->size; ++i) { // for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow *buf = pResultRowInfo->pResult[i]; // SResultRow *buf = pResultRowInfo->pResult[i];
if (!isResultRowClosed(pResultRowInfo, i)) { // if (!isResultRowClosed(pResultRowInfo, i)) {
continue; // continue;
} // }
//
setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset); // setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset);
//
for (int32_t j = 0; j < numOfOutput; ++j) { // for (int32_t j = 0; j < numOfOutput; ++j) {
// pCtx[j].startTs = buf->win.skey; //// pCtx[j].startTs = buf->win.skey;
// if (pCtx[j].functionId < 0) { //// if (pCtx[j].functionId < 0) {
// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); //// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
// } else { //// } else {
// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); //// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
// } //// }
} // }
//
//
/* // /*
* set the number of output results for group by normal columns, the number of output rows usually is 1 except // * set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query // * the top and bottom query
*/ // */
buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); // buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
} // }
//
} else { // } else {
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
// if (pCtx[j].functionId < 0) { // if (pCtx[j].functionId < 0) {
// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); // doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
// } else { // } else {
// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); pCtx[j].fpSet->finalize(&pCtx[j]);
// } // }
} }
} // }
} }
static bool hasMainOutput(STaskAttr *pQueryAttr) { static bool hasMainOutput(STaskAttr *pQueryAttr) {
...@@ -5362,6 +5371,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt ...@@ -5362,6 +5371,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator; return pOperator;
} }
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
}
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
assert(pTableScanInfo != NULL && pDownstream != NULL); assert(pTableScanInfo != NULL && pDownstream != NULL);
...@@ -5752,9 +5766,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -5752,9 +5766,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
break; break;
} }
if (pAggInfo->current != NULL) { // if (pAggInfo->current != NULL) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
} // }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
...@@ -6667,8 +6681,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE ...@@ -6667,8 +6681,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGet(pExprInfo, i); SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
memcpy(&p[i], pExpr, sizeof(SExprInfo)); assignExprInfo(&p[i], pExpr);
} }
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册