diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 031f06adfd8517b58c9531eeedee6de19ce80d71..ee07ea8eb1fd8be5c1ea81e3035e4e780507a760 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -501,12 +501,6 @@ typedef struct SProjectOperatorInfo { int64_t curOutput; } SProjectOperatorInfo; -typedef struct SLimitOperatorInfo { - SLimit limit; - int64_t currentOffset; - int64_t currentRows; -} SLimitOperatorInfo; - typedef struct SSLimitOperatorInfo { int64_t groupTotal; int64_t currentGroupOffset; @@ -525,11 +519,6 @@ typedef struct SSLimitOperatorInfo { int64_t threshold; } SSLimitOperatorInfo; -typedef struct SFilterOperatorInfo { - SSingleColumnFilterInfo* pFilterInfo; - int32_t numOfFilterCols; -} SFilterOperatorInfo; - typedef struct SFillOperatorInfo { struct SFillInfo* pFillInfo; SSDataBlock* pRes; @@ -682,22 +671,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); - void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); - -void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters); - -int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); -void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); - STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win); -STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 845e2cd87883e23b0d94eb665a4239f31a949e39..21ba328b437595892219350342476be5ff8088f5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -194,9 +194,6 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o } static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); -static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, - int32_t numOfCols, int32_t* rowCellInfoOffset); - void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); static bool functionNeedToExecute(SqlFunctionCtx* pCtx); @@ -214,8 +211,6 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); // static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win); static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); -static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); - static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); @@ -264,10 +259,6 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); -static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, - SExprInfo* pExpr); -static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, - SExprInfo* pExpr, char* val, int16_t bytes); static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo); @@ -3344,11 +3335,6 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCt offset += pLocalExprInfo->base.resSchema.bytes; } - - // todo : use index to avoid iterator all possible output columns - if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddev(pRuntimeEnv, pCtx, numOfOutput, pExprInfo); - } } // set the tsBuf start position before check each data block @@ -3544,19 +3530,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput) } } -void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity) { - SSDataBlock* pDataBlock = pBInfo->pRes; - - for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - - int32_t functionId = pBInfo->pCtx[i].functionId; - if (functionId < 0) { - memset(pBInfo->pCtx[i].pOutput, 0, pColInfo->info.bytes * (*bufCapacity)); - } - } -} - void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); @@ -3714,23 +3687,6 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow return pTableQueryInfo; } -STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) { - STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(1, sizeof(STableQueryInfo)); - - // pTableQueryInfo->win = win; - pTableQueryInfo->lastKey = win.skey; - - // set more initial size of interval/groupby query - int32_t initialSize = 16; - int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pTableQueryInfo); - return NULL; - } - - return pTableQueryInfo; -} - void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; @@ -3834,30 +3790,6 @@ void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKe pAggInfo->groupId = tableGroupId; } -void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfCols, - int32_t* rowCellInfoOffset) { - // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - SFilePage* page = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); - - int16_t offset = 0; - for (int32_t i = 0; i < numOfCols; ++i) { - pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset); - offset += pCtx[i].resDataInfo.bytes; - - int32_t functionId = pCtx[i].functionId; - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF || - functionId == FUNCTION_DERIVATIVE) { - if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; - } - - /* - * set the output buffer information and intermediate buffer, - * not all queries require the interResultBuf, such as COUNT - */ - pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); - } -} - void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -4626,73 +4558,6 @@ static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_ return terrno; } -int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator, - void* param) { - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - - STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - pQueryAttr->tsdb = tsdb; - - if (tsdb != NULL) { - int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr); - - pRuntimeEnv->groupResInfo.totalGroup = (int32_t)(pQueryAttr->stableQuery ? GET_NUM_OF_TABLEGROUP(pRuntimeEnv) : 0); - pRuntimeEnv->enableGroupData = false; - - pRuntimeEnv->pQueryAttr = pQueryAttr; - pRuntimeEnv->pTsBuf = pTsBuf; - pRuntimeEnv->cur.vgroupIndex = -1; - setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo); - - if (sourceOptr != NULL) { - assert(pRuntimeEnv->proot == NULL); - pRuntimeEnv->proot = sourceOptr; - } - - if (pTsBuf != NULL) { - int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order); - } - - int32_t ps = 4096; - getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize); - - int32_t TENMB = 1024 * 1024 * 10; - int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, "", "/tmp"); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // create runtime environment - int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables; - pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); - pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent)); - if (pQInfo->summary.queryProfEvents == NULL) { - // qDebug("QInfo:0x%"PRIx64" failed to allocate query prof events array", pQInfo->qId); - } - - pQInfo->summary.operatorProfResults = - taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK); - - if (pQInfo->summary.operatorProfResults == NULL) { - // qDebug("QInfo:0x%"PRIx64" failed to allocate operator prof results hash", pQInfo->qId); - } - - code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t)pQueryAttr->tableGroupInfo.numOfTables, pOperator, param); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // setTaskStatus(pOperator->pTaskInfo, QUERY_NOT_COMPLETED); - return TSDB_CODE_SUCCESS; -} - static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { #if 0 if (order == TSDB_ORDER_ASC) { @@ -6890,58 +6755,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; } -static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SLimitOperatorInfo* pInfo = pOperator->info; - - SSDataBlock* pBlock = NULL; - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; - - while (1) { - publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pDownstream->getNextFn(pDownstream, newgroup); - publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); - return NULL; - } - - if (pInfo->currentOffset == 0) { - break; - } else if (pInfo->currentOffset >= pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; - } else { // TODO handle the data movement - int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); - pBlock->info.rows = remain; - - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); - } - - pInfo->currentOffset = 0; - break; - } - } - - if (pInfo->currentRows + pBlock->info.rows >= pInfo->limit.limit) { - pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->currentRows); - pInfo->currentRows = pInfo->limit.limit; - - doSetOperatorCompleted(pOperator); - } else { - pInfo->currentRows += pBlock->info.rows; - } - - return pBlock; -} - static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; @@ -7791,11 +7604,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pSortInfo); } -static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { - SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*)param; - doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols); -} - static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; taosHashCleanup(pInfo->pSet); @@ -9175,41 +8983,6 @@ _complete: return code; } -int32_t cloneExprFilterInfo(SColumnFilterInfo** dst, SColumnFilterInfo* src, int32_t filterNum) { - if (filterNum <= 0) { - return TSDB_CODE_SUCCESS; - } - - *dst = taosMemoryCalloc(filterNum, sizeof(*src)); - if (*dst == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - memcpy(*dst, src, sizeof(*src) * filterNum); - - for (int32_t i = 0; i < filterNum; i++) { - if ((*dst)[i].filterstr && dst[i]->len > 0) { - void* pz = taosMemoryCalloc(1, (size_t)(*dst)[i].len + 1); - - if (pz == NULL) { - if (i == 0) { - taosMemoryFree(*dst); - } else { - freeColumnFilterInfo(*dst, i); - } - - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - memcpy(pz, (void*)src->pz, (size_t)src->len + 1); - - (*dst)[i].pz = (int64_t)pz; - } - } - - return TSDB_CODE_SUCCESS; -} - static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, int32_t numOfOutput, int32_t tagLen, bool superTable) { for (int32_t i = 0; i < numOfOutput; ++i) { @@ -9232,113 +9005,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol return TSDB_CODE_SUCCESS; } -// TODO tag length should be passed from client, refactor -int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters) { - tExprNode* expr = NULL; - - TRY(TSDB_MAX_TAG_CONDITIONS) { expr = exprTreeFromBinary(data, len); } - CATCH(code) { - CLEANUP_EXECUTE(); - return code; - } - END_TRY - - if (expr == NULL) { - // qError("failed to create expr tree"); - return TSDB_CODE_QRY_APP_ERROR; - } - - // int32_t ret = filterInitFromTree(expr, pFilters, 0); - // tExprTreeDestroy(expr, NULL); - - // return ret; -} - -// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** -// pFilterInfo, uint64_t qId) { -// *pFilterInfo = taosMemoryCalloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols); -// if (*pFilterInfo == NULL) { -// return TSDB_CODE_QRY_OUT_OF_MEMORY; -// } -// -// for (int32_t i = 0, j = 0; i < numOfCols; ++i) { -// if (pCols[i].flist.numOfFilters > 0) { -// SSingleColumnFilterInfo* pFilter = &((*pFilterInfo)[j]); -// -// memcpy(&pFilter->info, &pCols[i], sizeof(SColumnInfo)); -// pFilter->info = pCols[i]; -// -// pFilter->numOfFilters = pCols[i].flist.numOfFilters; -// pFilter->pFilters = taosMemoryCalloc(pFilter->numOfFilters, sizeof(SColumnFilterElem)); -// if (pFilter->pFilters == NULL) { -// return TSDB_CODE_QRY_OUT_OF_MEMORY; -// } -// -// for (int32_t f = 0; f < pFilter->numOfFilters; ++f) { -// SColumnFilterElem* pSingleColFilter = &pFilter->pFilters[f]; -// pSingleColFilter->filterInfo = pCols[i].flist.filterInfo[f]; -// -// int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr; -// int32_t upper = pSingleColFilter->filterInfo.upperRelOptr; -// if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) { -// //qError("QInfo:0x%"PRIx64" invalid filter info", qId); -// return TSDB_CODE_QRY_INVALID_MSG; -// } -// -// pSingleColFilter->fp = getFilterOperator(lower, upper); -// if (pSingleColFilter->fp == NULL) { -// //qError("QInfo:0x%"PRIx64" invalid filter info", qId); -// return TSDB_CODE_QRY_INVALID_MSG; -// } -// -// pSingleColFilter->bytes = pCols[i].bytes; -// -// if (lower == TSDB_RELATION_IN) { -//// buildFilterSetFromBinary(&pSingleColFilter->q, (char *)(pSingleColFilter->filterInfo.pz), -///(int32_t)(pSingleColFilter->filterInfo.len)); -// } -// } -// -// j++; -// } -// } -// -// return TSDB_CODE_SUCCESS; -//} - -void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) { - // for (int32_t i = 0; i < numOfFilterCols; ++i) { - // if (pFilterInfo[i].numOfFilters > 0) { - // if (pFilterInfo[i].pFilters->filterInfo.lowerRelOptr == TSDB_RELATION_IN) { - // taosHashCleanup((SHashObj *)(pFilterInfo[i].pFilters->q)); - // } - // taosMemoryFreeClear(pFilterInfo[i].pFilters); - // } - // } - // - // taosMemoryFreeClear(pFilterInfo); - return NULL; -} - -int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId) { - for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) { - // if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].flist.filterInfo != NULL) { - // pQueryAttr->numOfFilterCols++; - // } - } - - if (pQueryAttr->numOfFilterCols == 0) { - return TSDB_CODE_SUCCESS; - } - - // doCreateFilterInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols, pQueryAttr->numOfFilterCols, - // &pQueryAttr->pFilterInfo, qId); - - pQueryAttr->createFilterOperator = true; - - return TSDB_CODE_SUCCESS; -} - static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) { assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL);