diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 00f71f103f0db4ba3b32652ff5cbcf52d4cd42dd..39d8ec34203accf5a8842a733334c5ec9ef1318e 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -165,6 +165,7 @@ typedef struct SQLFunctionCtx { SPoint1 start; SPoint1 end; + int32_t columnIndex; SFunctionFpSet* fpSet; } SQLFunctionCtx; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 85f7b9e42c7dde01055eeb7cc494a57ba90da23b..dd633002db0d5f84b0dd441bc356358d84f871cb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -414,6 +414,10 @@ typedef struct STagScanInfo { int32_t curPos; } STagScanInfo; +typedef struct SStreamBlockScanInfo { + +} SStreamBlockScanInfo; + typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; int32_t *rowCellInfoOffset; // offset value for each row result cell info @@ -554,13 +558,12 @@ typedef struct SOrderOperatorInfo { SSDataBlock *pDataBlock; } SOrderOperatorInfo; -void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); - SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); +SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 8b7370f9c13e2e463fa0f3e3e1959404d747df93..f39df4d4ae69753ad7c111ee3e75e17c64c5a1b9 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -73,15 +73,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ assert(tsdb != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = 0; - uint64_t uid = 0; - STimeWindow window = TSWINDOW_INITIALIZER; - int32_t tableType = 0; - - SPhyNode* pPhyNode = pSubplan->pNode; -// STableGroupInfo groupInfo = {0}; - - code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); + int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 97dc7e1dd58b5f02f355bde95eb2ede25914f635..2b5faba29d322ae6ae105d507ab1f61c3d4587ee 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -12,11 +12,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include +#include "exception.h" #include "os.h" -#include "tmsg.h" #include "tglobal.h" +#include "tmsg.h" #include "ttime.h" -#include "exception.h" #include "executorimpl.h" #include "function.h" @@ -176,7 +177,7 @@ static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx); -static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); +static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static bool hasMainOutput(STaskAttr *pQueryAttr); @@ -320,7 +321,7 @@ SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) { for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData idata = {{0}}; - SExprInfo* pExpr = taosArrayGet(pExprInfo, i); + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); idata.info.type = pExpr->base.resSchema.type; idata.info.bytes = pExpr->base.resSchema.bytes; @@ -382,8 +383,8 @@ static bool isProjQuery(STaskAttr *pQueryAttr) { return true; } -static bool hasNull(SColIndex* pColIndex, SColumnDataAgg *pStatis) { - if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { +static bool hasNull(SColumn* pColumn, SColumnDataAgg *pStatis) { + if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { return false; } @@ -1122,43 +1123,52 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pC } void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { - if (pCtx[0].functionId == FUNCTION_ARITHM) { +// if (pCtx[0].functionId == FUNCTION_ARITHM) { // SScalar* pSupport = (SScalarFunctionSupport*) pCtx[0].param[1].pz; // if (pSupport->colList == NULL) { // doSetInputDataBlock(pOperator, pCtx, pBlock, order); // } else { // doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); // } - } else { +// } else { if (pBlock->pDataBlock != NULL) { doSetInputDataBlock(pOperator, pCtx, pBlock, order); } else { doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); } - } +// } } static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { -#if 0 for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; - pCtx[i].currentStage = (uint8_t)pOperator->pRuntimeEnv->scanFlag; + pCtx[i].currentStage = MAIN_SCAN/*(uint8_t)pOperator->pRuntimeEnv->scanFlag*/; - setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); + setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns); if (pCtx[i].functionId == FUNCTION_ARITHM) { // setArithParams((SScalarFunctionSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); } else { - SColIndex* pCol = &pOperator->pExpr[i].base.pColumns->info.; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCtx[i].functionId == FUNCTION_BLKINFO) || - (TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { - SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; - SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); + uint32_t flag = pOperator->pExpr[i].base.pColumns->flag; + if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) || + (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) { + + SColumn* pCol = pOperator->pExpr[i].base.pColumns; + if (pCtx[i].columnIndex == -1) { + for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); + if (pColData->info.colId == pCol->info.colId) { + pCtx[i].columnIndex = j; + break; + } + } + } + SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pCtx[i].columnIndex); // in case of the block distribution query, the inputBytes is not a constant value. pCtx[i].pInput = p->pData; - assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); + assert(p->info.colId == pCol->info.colId); if (pCtx[i].functionId < 0) { SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -1169,30 +1179,28 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, // uint32_t status = aAggs[pCtx[i].functionId].status; // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { -// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); -// // In case of the top/bottom query again the nest query result, which has no timestamp column -// // don't set the ptsList attribute. -// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { -// pCtx[i].ptsList = (int64_t*) tsInfo->pData; -// } else { -// pCtx[i].ptsList = NULL; -// } + SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); + // In case of the top/bottom query again the nest query result, which has no timestamp column + // don't set the ptsList attribute. + if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + pCtx[i].ptsList = (int64_t*) tsInfo->pData; + } else { + pCtx[i].ptsList = NULL; + } +// } +// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { +// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; +// SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); +// +// pCtx[i].pInput = p->pData; +// assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); +// for(int32_t j = 0; j < pBlock->info.rows; ++j) { +// char* dst = p->pData + j * p->info.bytes; +// taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true); // } - } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { - SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; - SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); - - pCtx[i].pInput = p->pData; - assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); - for(int32_t j = 0; j < pBlock->info.rows; ++j) { - char* dst = p->pData + j * p->info.bytes; - taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true); - } } } } -#endif - } static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { @@ -1837,19 +1845,19 @@ static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx * } // in the reverse table scan, only the following functions need to be executed - if (IS_REVERSE_SCAN(pRuntimeEnv) || - (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) { - return false; - } +// if (IS_REVERSE_SCAN(pRuntimeEnv) || +// (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) { +// return false; +// } return true; } -void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex) { +void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn) { SColumnDataAgg *pAgg = NULL; - if (pSDataBlock->pBlockAgg != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) { - pAgg = &pSDataBlock->pBlockAgg[pColIndex->colIndex]; + if (pSDataBlock->pBlockAgg != NULL && TSDB_COL_IS_NORMAL_COL(pColumn->flag)) { + pAgg = &pSDataBlock->pBlockAgg[pCtx->columnIndex]; pCtx->agg = *pAgg; pCtx->isAggSet = true; @@ -1858,10 +1866,10 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde pCtx->isAggSet = false; } - pCtx->hasNull = hasNull(pColIndex, pAgg); + pCtx->hasNull = hasNull(pColumn, pAgg); // set the statistics data for primary time stamp column - if (pCtx->functionId == FUNCTION_SPREAD && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + if (pCtx->functionId == FUNCTION_SPREAD && pColumn->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { pCtx->isAggSet = true; pCtx->agg.min = pSDataBlock->info.window.skey; pCtx->agg.max = pSDataBlock->info.window.ekey; @@ -2033,6 +2041,7 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC SSqlExpr *pSqlExpr = &pExpr->base; SQLFunctionCtx* pCtx = &pFuncCtx[i]; + #if 0 SColIndex *pIndex = &pSqlExpr->colInfo; @@ -2043,16 +2052,16 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC pCtx->requireNull = false; } #endif -// pCtx->inputBytes = pSqlExpr->colBytes; +// pCtx->inputBytes = pSqlExpr->; // pCtx->inputType = pSqlExpr->colType; pCtx->ptsOutputBuf = NULL; pCtx->fpSet = fpSet; - + pCtx->columnIndex = -1; pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes; pCtx->resDataInfo.type = pSqlExpr->resSchema.type; -// pCtx->order = pQueryAttr->order.order; + pCtx->order = TSDB_ORDER_ASC; // pCtx->functionId = pSqlExpr->functionId; // pCtx->stableQuery = pQueryAttr->stableQuery; pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; @@ -2108,12 +2117,12 @@ static SQLFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC } } -// for(int32_t i = 1; i < numOfOutput; ++i) { -// (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr[i - 1].base.interBytes); -// } + for(int32_t i = 1; i < numOfOutput; ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i - 1); + (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr->base.interBytes); + } setCtxTagColumnInfo(pFuncCtx, numOfOutput); - return pFuncCtx; } @@ -3717,49 +3726,49 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SQLFunctionCt void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfOutput = pOperator->numOfOutput; - if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { - // for each group result, call the finalize function for each column - if (pQueryAttr->groupbyColumn) { - closeAllResultRows(pResultRowInfo); - } - - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { - SResultRow *buf = pResultRowInfo->pResult[i]; - if (!isResultRowClosed(pResultRowInfo, i)) { - continue; - } - - setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset); - - for (int32_t j = 0; j < numOfOutput; ++j) { -// pCtx[j].startTs = buf->win.skey; -// if (pCtx[j].functionId < 0) { -// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); -// } else { -// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); -// } - } - - - /* - * set the number of output results for group by normal columns, the number of output rows usually is 1 except - * the top and bottom query - */ - buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); - } - - } else { +// if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { +// // for each group result, call the finalize function for each column +// if (pQueryAttr->groupbyColumn) { +// closeAllResultRows(pResultRowInfo); +// } +// +// for (int32_t i = 0; i < pResultRowInfo->size; ++i) { +// SResultRow *buf = pResultRowInfo->pResult[i]; +// if (!isResultRowClosed(pResultRowInfo, i)) { +// continue; +// } +// +// setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset); +// +// for (int32_t j = 0; j < numOfOutput; ++j) { +//// pCtx[j].startTs = buf->win.skey; +//// if (pCtx[j].functionId < 0) { +//// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); +//// } else { +//// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); +//// } +// } +// +// +// /* +// * set the number of output results for group by normal columns, the number of output rows usually is 1 except +// * the top and bottom query +// */ +// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); +// } +// +// } else { for (int32_t j = 0; j < numOfOutput; ++j) { // if (pCtx[j].functionId < 0) { // doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); // } else { -// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); + pCtx[j].fpSet->finalize(&pCtx[j]); // } } - } +// } } static bool hasMainOutput(STaskAttr *pQueryAttr) { @@ -5362,6 +5371,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt return pOperator; } +SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + +} + + void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { assert(pTableScanInfo != NULL && pDownstream != NULL); @@ -5752,9 +5766,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { break; } - if (pAggInfo->current != NULL) { +// if (pAggInfo->current != NULL) { // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - } +// } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); @@ -6667,8 +6681,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { - SExprInfo* pExpr = taosArrayGet(pExprInfo, i); - memcpy(&p[i], pExpr, sizeof(SExprInfo)); + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + assignExprInfo(&p[i], pExpr); } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));