diff --git a/include/libs/function/function.h b/include/libs/function/function.h index fe9edc323d20e636c431368a1b95a7f78990e1a4..5b4108ef970609de19ff91a8ae66d91baf5767f5 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -36,7 +36,7 @@ typedef struct SFuncExecEnv { typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); -typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx); +typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); @@ -154,7 +154,9 @@ typedef struct SResultDataInfo { int32_t interBufSize; } SResultDataInfo; -#define GET_RES_INFO(ctx) ((ctx)->resultInfo) +#define GET_RES_INFO(ctx) ((ctx)->resultInfo) +#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo))) +#define DATA_SET_FLAG ',' // to denote the output area has data, not null value typedef struct SInputColumnInfoData { int32_t totalRows; // total rows in current columnar data diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index 12a78134c3c13f708581452adafd7dd3832f9681..f947acf822b1e435368a4c7670fc9691b27d6f5e 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -1,17 +1,12 @@ aux_source_directory(src EXECUTOR_SRC) #add_library(executor ${EXECUTOR_SRC}) - -#target_link_libraries( -# executor -# PRIVATE os util common function parser planner qcom tsdb -#) - add_library(executor STATIC ${EXECUTOR_SRC}) #set_target_properties(executor PROPERTIES # IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a" # INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor" # ) + target_link_libraries(executor PRIVATE os util common function parser planner qcom vnode scalar nodes ) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5a3e25bd68bd2fd6aa5cb26ba734102d9b204e15..b16d75502ee898fa7806da6e6e02a82e6da5f5ef 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -385,6 +385,12 @@ typedef struct SExchangeInfo { SLoadRemoteDataInfo loadInfo; } SExchangeInfo; +typedef struct SColMatchInfo { + int32_t colId; + int32_t targetSlotId; + bool output; +} SColMatchInfo; + typedef struct STableScanInfo { void* dataReader; int32_t numOfBlocks; // extract basic running information. @@ -497,6 +503,7 @@ typedef struct SAggOperatorInfo { typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; + SAggSupporter aggSup; SSDataBlock *existDataBlock; int32_t threshold; SLimit limit; @@ -623,13 +630,28 @@ typedef struct SDistinctOperatorInfo { SHashObj* pSet; SSDataBlock* pRes; bool recordNullVal; // has already record the null value, no need to try again - int64_t threshold; - int64_t outputCapacity; - int32_t totalBytes; + int64_t threshold; // todo remove it + int64_t outputCapacity;// todo remove it + int32_t totalBytes; // todo remove it char* buf; SArray* pDistinctDataInfo; } SDistinctOperatorInfo; +int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); +void operatorDummyCloseFn(void* param, int32_t numOfCols); +int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); +int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, + int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey); +void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); +void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); +void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); +int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, + int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); +void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); +int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, + char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, + uint64_t* total, SArray* pColList); + SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo); @@ -647,12 +669,12 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput); +SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 23dde6b295d5a1ba743e5918f3c6b3b2fd99831f..12255189b3bd949b8b55b6c7547ff6452251c7aa 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,12 +13,11 @@ * along with this program. If not, see . */ -#include -#include -#include -#include -#include -#include +#include "filter.h" +#include "functionMgt.h" +#include "function.h" +#include "querynodes.h" +#include "tname.h" #include "os.h" #include "parser.h" @@ -29,7 +28,6 @@ #include "tsort.h" #include "ttime.h" -#include "../../function/inc/taggfunction.h" #include "executorimpl.h" #include "function.h" #include "query.h" @@ -47,7 +45,6 @@ #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) -#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SDATA_BLOCK_INITIALIZER \ (SDataBlockInfo) { {0}, 0 } @@ -67,11 +64,6 @@ typedef enum SResultTsInterpType { RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; -typedef struct SColMatchInfo { - int32_t colId; - int32_t targetSlotId; - bool output; -} SColMatchInfo; #if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { @@ -215,7 +207,6 @@ static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); -static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput); static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); @@ -239,19 +230,16 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) -static int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { +int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } -static void operatorDummyCloseFn(void* param, int32_t numOfCols) {} +void operatorDummyCloseFn(void* param, int32_t numOfCols) {} static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); -static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, - int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, - SAggSupporter* pAggSup); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow* win); @@ -984,7 +972,7 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, b ts[4] = pWin->ekey + delta; // window end key } -static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, +void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pWin->skey; @@ -1278,17 +1266,27 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { ASSERT(!fmIsAggFunc(pCtx->functionId)); - SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pBlockList, &pSrcBlock); + if (fmIsNonstandardSQLFunc(pCtx->functionId)) { + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); - SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, k); + SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]); + pCtx[k].fpSet.init(&pCtx[k], pResInfo); - scalarCalculate((SNode *)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - pResult->info.rows = dest.numOfRows; + pCtx[k].pOutput = pColInfoData->pData; + int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); + pResult->info.rows = numOfRows; + } else { + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); - taosArrayDestroy(pBlockList); + SScalarParam dest = {0}; + dest.columnData = taosArrayGet(pResult->pDataBlock, k); + + scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); + pResult->info.rows = dest.numOfRows; + taosArrayDestroy(pBlockList); + } } else { ASSERT(0); } @@ -1683,179 +1681,6 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); } -static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, - int32_t numOfGroupCols) { - SColumnDataAgg* pColAgg = NULL; - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); - if (pBlock->pBlockAgg != NULL) { - pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? - } - - bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); - - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); - if (pkey->isNull && isNull) { - continue; - } - - if (isNull || pkey->isNull) { - return false; - } - - char* val = colDataGetData(pColInfoData, rowIndex); - - if (IS_VAR_DATA_TYPE(pkey->type)) { - int32_t len = varDataLen(val); - if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) { - continue; - } else { - return false; - } - } else { - if (memcmp(pkey->pData, val, pkey->bytes) != 0) { - return false; - } - } - } - - return true; -} - -static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { - SColumnDataAgg* pColAgg = NULL; - - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); - - if (pBlock->pBlockAgg != NULL) { - pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? - } - - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); - if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { - pkey->isNull = true; - } else { - char* val = colDataGetData(pColInfoData, rowIndex); - if (IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, varDataTLen(val)); - } else { - memcpy(pkey->pData, val, pkey->bytes); - } - } - } -} - -static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) { - ASSERT(pKey != NULL); - size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); - - char* isNull = (char*)pKey; - char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); - if (pkey->isNull) { - isNull[i] = 1; - continue; - } - - isNull[i] = 0; - if (IS_VAR_DATA_TYPE(pkey->type)) { - varDataCopy(pStart, pkey->pData); - pStart += varDataTLen(pkey->pData); - ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); - } else { - memcpy(pStart, pkey->pData, pkey->bytes); - pStart += pkey->bytes; - } - } - - *length = (pStart - (char*)pKey); - return 0; -} - -// assign the group keys or user input constant values if required -static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) { - for (int32_t i = 0; i < numOfOutput; ++i) { - if (pCtx[i].functionId == -1) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]); - - SColumnInfoData* pColInfoData = pCtx[i].input.pData[0]; - if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) { - char* dest = GET_ROWCELL_INTERBUF(pEntryInfo); - char* data = colDataGetData(pColInfoData, rowIndex); - - // set result exists, todo refactor - memcpy(dest, data, pColInfoData->info.bytes); - pEntryInfo->hasResult = DATA_SET_FLAG; - pEntryInfo->numOfRes = 1; - } - } - } -} - -static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SGroupbyOperatorInfo* pInfo = pOperator->info; - - SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; - int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); - // if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { - // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv)); - // return; - // } - - int32_t len = 0; - STimeWindow w = TSWINDOW_INITIALIZER; - - int32_t num = 0; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - // Compare with the previous row of this column, and do not set the output buffer again if they are identical. - if (!pInfo->isInit) { - keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); - pInfo->isInit = true; - num++; - continue; - } - - bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols); - if (equal) { - num++; - continue; - } - - /*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); - int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); - } - - int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); - - // assign the group keys or user input constant values if required - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); - keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); - num = 1; - } - - if (num > 0) { - /*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); - int32_t ret = - setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, - 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); - } - - int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); - } -} - static void doKeepTuple(SSessionAggOperatorInfo* pInfo, int64_t ts) { pInfo->curWindow.ekey = ts; pInfo->prevTs = ts; @@ -1949,7 +1774,7 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { } } -static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, +int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) { SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo; @@ -2083,7 +1908,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num SFuncExecEnv env = {0}; pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; - if (fmIsAggFunc(pCtx->functionId)) { + if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) { fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); } else { @@ -2748,70 +2573,6 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData return status; } -int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - STaskCostInfo* pCost = &pTaskInfo->cost; - - pCost->totalBlocks += 1; - pCost->totalRows += pBlock->info.rows; - - pCost->totalCheckedRows += pBlock->info.rows; - pCost->loadBlocks += 1; - - *status = BLK_DATA_ALL_NEEDED; - - SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); - if (pCols == NULL) { - return terrno; - } - - int32_t numOfCols = pBlock->info.numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p = taosArrayGet(pCols, i); - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { - continue; - } - - ASSERT(pColMatchInfo->colId == p->info.colId); - taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); - } - - if (pTableScanInfo->pFilterNode != NULL) { - SFilterInfo* filter = NULL; - int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0); - - SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; - code = filterSetDataFromSlotId(filter, ¶m1); - - int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); - - SSDataBlock* px = createOneDataBlock(pBlock); - blockDataEnsureCapacity(px, pBlock->info.rows); - - int32_t numOfRow = 0; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); - - numOfRow = 0; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - if (rowRes[j] == 0) { - continue; - } - - colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); - numOfRow += 1; - } - *pSrc = *pDst; - } - - pBlock->info.rows = numOfRow; - } - - return TSDB_CODE_SUCCESS; -} - int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { *status = BLK_DATA_NO_NEEDED; @@ -3186,35 +2947,6 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) } } -static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { -#if 0 - int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); - SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); - - size_t t = taosArrayGetSize(group); - for (int32_t j = 0; j < t; ++j) { - STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); - updateTableQueryInfoForReverseScan(pCheckInfo); - - // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide - // the start check timestamp of tsdbQueryHandle -// STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j); -// pTableKeyInfo->lastKey = pCheckInfo->lastKey; -// -// assert(pCheckInfo->pTable == pTableKeyInfo->pTable); - } - } -#endif -} - -void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { - for (int32_t i = 0; i < numOfOutput; ++i) { - SWITCH_ORDER(pCtx[i].order); - } -} - void initResultRow(SResultRow* pResultRow) { pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); } @@ -3344,27 +3076,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } } -static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { - // if (pRuntimeEnv->pTsBuf) { - // SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); - // bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); - // assert(ret); - // } - - // reverse order time range - SET_REVERSE_SCAN_FLAG(pTableScanInfo); - // setTaskStatus(pTableScanInfo, QUERY_NOT_COMPLETED); - - switchCtxOrder(pCtx, numOfOutput); - - SWITCH_ORDER(pTableScanInfo->order); - setupQueryRangeForReverseScan(pTableScanInfo); - - pTableScanInfo->times = 1; - pTableScanInfo->current = 0; - pTableScanInfo->reverseTimes = 0; -} - void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t j = 0; j < numOfOutput; ++j) { if (pCtx[j].functionId == -1) { @@ -3766,7 +3477,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI return 0; } -static void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, +void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); @@ -4353,237 +4064,6 @@ static void doCloseAllTimeWindow(STaskRuntimeEnv* pRuntimeEnv) { } } -static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { - STableScanInfo* pTableScanInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - SSDataBlock* pBlock = &pTableScanInfo->block; - STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo; - - *newgroup = false; - - while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - pTableScanInfo->numOfBlocks += 1; - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); - - // todo opt - // if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { - // STableQueryInfo** pTableQueryInfo = - // (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid)); - // if (pTableQueryInfo == NULL) { - // break; - // } - // - // doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order); - // } - - // this function never returns error? - uint32_t status = BLK_DATA_ALL_NEEDED; - int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status); - // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pOperator->pTaskInfo->env, code); - } - - // current block is ignored according to filter result by block statistics data, continue load the next block - if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) { - continue; - } - - return pBlock; - } - - return NULL; -} - -static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { - STableScanInfo* pTableScanInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - // The read handle is not initialized yet, since no qualified tables exists - if (pTableScanInfo->dataReader == NULL) { - return NULL; - } - - SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; - *newgroup = false; - - while (pTableScanInfo->current < pTableScanInfo->times) { - SSDataBlock* p = doTableScanImpl(pOperator, newgroup); - if (p != NULL) { - return p; - } - - if (++pTableScanInfo->current >= pTableScanInfo->times) { - if (pTableScanInfo->reverseTimes <= 0 /* || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)*/) { - return NULL; - } else { - break; - } - } - - // do prepare for the next round table scan operation - // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - // tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); - - setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTableScanInfo->scanFlag = REPEAT_SCAN; - - if (pResultRowInfo->size > 0) { - pResultRowInfo->curPos = 0; - } - - qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); - } - - SSDataBlock* p = NULL; - // todo refactor - if (pTableScanInfo->reverseTimes > 0) { - setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); - // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - // tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); - - qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); - - if (pResultRowInfo->size > 0) { - pResultRowInfo->curPos = pResultRowInfo->size - 1; - } - - p = doTableScanImpl(pOperator, newgroup); - } - - return p; -} - -static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - STableScanInfo* pTableScanInfo = pOperator->info; - *newgroup = false; - - STableBlockDistInfo tableBlockDist = {0}; - tableBlockDist.numOfTables = 1; // TODO set the correct number of tables - - int32_t numRowSteps = TSDB_DEFAULT_MAX_ROW_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS; - if (TSDB_DEFAULT_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) { - ++numRowSteps; - } - - tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); - taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps); - - tableBlockDist.maxRows = INT_MIN; - tableBlockDist.minRows = INT_MAX; - - tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist); - tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); - - SSDataBlock* pBlock = &pTableScanInfo->block; - pBlock->info.rows = 1; - pBlock->info.numOfCols = 1; - -// SBufferWriter bw = tbufInitWriter(NULL, false); -// blockDistInfoToBinary(&tableBlockDist, &bw); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); - -// int32_t len = (int32_t) tbufTell(&bw); -// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); -// *(int32_t*) pColInfo->pData = len; -// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); -// -// tbufCloseWriter(&bw); - -// SArray* g = GET_TABLEGROUP(pOperator->, 0); -// pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0); - - pOperator->status = OP_EXEC_DONE; - return pBlock; -} - -static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { - size_t total = taosArrayGetSize(pInfo->pBlockLists); - - pInfo->validBlockIndex = 0; - for (int32_t i = 0; i < total; ++i) { - SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i); - blockDataDestroy(p); - } - taosArrayClear(pInfo->pBlockLists); -} - -static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) { - // NOTE: this operator does never check if current status is done or not - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStreamBlockScanInfo* pInfo = pOperator->info; - - pTaskInfo->code = pOperator->_openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - return NULL; - } - - if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { - size_t total = taosArrayGetSize(pInfo->pBlockLists); - if (pInfo->validBlockIndex >= total) { - doClearBufferedBlocks(pInfo); - return NULL; - } - - int32_t current = pInfo->validBlockIndex++; - return taosArrayGetP(pInfo->pBlockLists, current); - } else { - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - blockDataCleanup(pInfo->pRes); - - while (tqNextDataBlock(pInfo->readerHandle)) { - pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - terrno = pTaskInfo->code; - return NULL; - } - - if (pBlockInfo->rows == 0) { - return NULL; - } - - SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); - - int32_t numOfCols = pInfo->pRes->info.numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p = taosArrayGet(pCols, i); - SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { - continue; - } - - ASSERT(pColMatchInfo->colId == p->info.colId); - taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p); - } - - if (pInfo->pRes->pDataBlock == NULL) { - // TODO add log - pTaskInfo->code = terrno; - return NULL; - } - - break; - } - - // record the scan action. - pInfo->numOfExec++; - pInfo->numOfRows += pBlockInfo->rows; - - return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; - } -} - int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param; if (code == TSDB_CODE_SUCCESS) { @@ -4669,7 +4149,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf } // TODO if only one or two columnss required, how to extract data? -static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, +int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { blockDataEnsureCapacity(pRes, numOfRows); @@ -5128,491 +4608,8 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { return pResBlock; } -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, - int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, - SNode* pCondition, SExecTaskInfo* pTaskInfo) { - assert(repeatTime > 0); - - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData idata = {0}; - taosArrayPush(pInfo->block.pDataBlock, &idata); - } - - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pTsdbReadHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; - pOperator->getNextFn = doTableScan; - pOperator->pTaskInfo = pTaskInfo; - - return pOperator; -} - -SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - - pInfo->dataReader = pTsdbReadHandle; - pInfo->times = 1; - pInfo->reverseTimes = 0; - pInfo->order = pRuntimeEnv->pQueryAttr->order.order; - pInfo->current = 0; - pInfo->prevGroupId = -1; - pRuntimeEnv->enableGroupData = true; - - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableSeqScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doTableScanImpl; - - return pOperator; -} - -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - pInfo->dataReader = dataReader; - pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); - - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_BINARY; - infoData.info.bytes = 1024; - infoData.info.colId = 0; - taosArrayPush(pInfo->block.pDataBlock, &infoData); - - pOperator->name = "DataBlockInfoScanOperator"; - // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doBlockInfoScan; - - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - - return pOperator; - - _error: - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - return NULL; -} - -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { - SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - int32_t numOfOutput = taosArrayGetSize(pColList); - - SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); - for(int32_t i = 0; i < numOfOutput; ++i) { - int16_t* id = taosArrayGet(pColList, i); - taosArrayPush(pColIds, id); - } - - pInfo->pColMatchInfo = pColList; - - // set the extract column id to streamHandle - tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds); - int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); - if (code != 0) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - return NULL; - } - - pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); - if (pInfo->pBlockLists == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - return NULL; - } - - pInfo->readerHandle = streamReadHandle; - pInfo->pRes = pResBlock; - - pOperator->name = "StreamBlockScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doStreamBlockScan; - pOperator->closeFn = operatorDummyCloseFn; - pOperator->pTaskInfo = pTaskInfo; - - return pOperator; -} - -static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) { - SOperatorInfo* operator=(SOperatorInfo*) param; - SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info; - if (TSDB_CODE_SUCCESS == code) { - pScanResInfo->pRsp = pMsg->pData; - - SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp; - pRsp->numOfRows = htonl(pRsp->numOfRows); - pRsp->useconds = htobe64(pRsp->useconds); - pRsp->handle = htobe64(pRsp->handle); - pRsp->compLen = htonl(pRsp->compLen); - } else { - operator->pTaskInfo->code = code; - } - - tsem_post(&pScanResInfo->ready); -} - -static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { - if (pInfo->pCondition == NULL) { - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; - } - - SFilterInfo* filter = NULL; - int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0); - - SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock}; - code = filterSetDataFromSlotId(filter, ¶m1); - - int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); - - SSDataBlock* px = createOneDataBlock(pInfo->pRes); - blockDataEnsureCapacity(px, pInfo->pRes->info.rows); - - // TODO refactor - int32_t numOfRow = 0; - for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { - SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); - - numOfRow = 0; - for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { - if (rowRes[j] == 0) { - continue; - } - - colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); - numOfRow += 1; - } - } - - px->info.rows = numOfRow; - pInfo->pRes = px; - - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; -} - -EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { - int32_t code = TSDB_CODE_SUCCESS; - ENodeType nType = nodeType(pNode); - - switch (nType) { - case QUERY_NODE_OPERATOR: { - SOperatorNode* node = (SOperatorNode*)pNode; - - if (OP_TYPE_EQUAL == node->opType) { - *(int32_t*)pContext = 1; - return DEAL_RES_CONTINUE; - } - - *(int32_t*)pContext = 0; - - return DEAL_RES_IGNORE_CHILD; - } - case QUERY_NODE_COLUMN: { - if (1 != *(int32_t*)pContext) { - return DEAL_RES_CONTINUE; - } - - SColumnNode* node = (SColumnNode*)pNode; - if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) { - *(int32_t*)pContext = 2; - return DEAL_RES_CONTINUE; - } - - *(int32_t*)pContext = 0; - return DEAL_RES_CONTINUE; - } - case QUERY_NODE_VALUE: { - if (2 != *(int32_t*)pContext) { - return DEAL_RES_CONTINUE; - } - - SValueNode* node = (SValueNode*)pNode; - char* dbName = nodesGetValueFromNode(node); - strncpy(pContext, varDataVal(dbName), varDataLen(dbName)); - *((char*)pContext + varDataLen(dbName)) = 0; - return DEAL_RES_ERROR; // stop walk - } - default: - break; - } - - return DEAL_RES_CONTINUE; -} - -void getDBNameFromCondition(SNode* pCondition, char* dbName) { - if (NULL == pCondition) { - return; - } - - nodesWalkNode(pCondition, getDBNameFromConditionWalker, dbName); -} - -static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { - // build message and send to mnode to fetch the content of system tables. - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSysTableScanInfo* pInfo = pOperator->info; - - // retrieve local table list info from vnode - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { - if (pInfo->pCur == NULL) { - pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); - } - - blockDataCleanup(pInfo->pRes); - - int32_t tableNameSlotId = 1; - SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); - - char* name = NULL; - int32_t numOfRows = 0; - - char n[TSDB_TABLE_NAME_LEN] = {0}; - while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { - STR_TO_VARSTR(n, name); - colDataAppend(pTableNameCol, numOfRows, n, false); - numOfRows += 1; - if (numOfRows >= pInfo->capacity) { - break; - } - - for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { - if (i == tableNameSlotId) { - continue; - } - - SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); - int64_t tmp = 0; - char t[10] = {0}; - STR_TO_VARSTR(t, "_"); //TODO - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - colDataAppend(pColInfoData, numOfRows, t, false); - } else { - colDataAppend(pColInfoData, numOfRows, (char*)&tmp, false); - } - } - } - - pInfo->loadInfo.totalRows += numOfRows; - pInfo->pRes->info.rows = numOfRows; - - // pInfo->elapsedTime; - // pInfo->totalBytes; - return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; - } else { // load the meta from mnode of the given epset - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - int64_t startTs = taosGetTimestampUs(); - - pInfo->req.type = pInfo->type; - strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); - if (pInfo->showRewrite) { - char dbName[TSDB_DB_NAME_LEN] = {0}; - getDBNameFromCondition(pInfo->pCondition, dbName); - sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); - } - - int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); - char* buf1 = taosMemoryCalloc(1, contLen); - tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); - - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pMsgSendInfo->param = pOperator; - pMsgSendInfo->msgInfo.pData = buf1; - pMsgSendInfo->msgInfo.len = contLen; - pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; - pMsgSendInfo->fp = loadSysTableContentCb; - - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pInfo->ready); - - if (pTaskInfo->code) { - return NULL; - } - - SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; - pInfo->req.showId = pRsp->handle; - - if (pRsp->numOfRows == 0 || pRsp->completed) { - pOperator->status = OP_EXEC_DONE; - } - - if (pRsp->numOfRows == 0) { - // qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" - // try next", - // GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, - // pDataInfo->totalRows, pExchangeInfo->totalRows); - return NULL; - } - - SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; - setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, - pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); - - return doFilterResult(pInfo); - } - - return NULL; -} - -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SArray* colList, - SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) { - SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pInfo->accountId = accountId; - pInfo->showRewrite = showRewrite; - pInfo->pRes = pResBlock; - pInfo->capacity = 4096; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; - - // TODO remove it - int32_t tableType = 0; - const char* name = tNameGetTableName(pName); - if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_DB; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_USER; - } else if (strncasecmp(name, TSDB_INS_TABLE_DNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_DNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_MNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_MODULE; - } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_QNODE; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_FUNC; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, tListLen(pName->tname)) == 0) { - // tableType = TSDB_MGMT_TABLE_INDEX; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_STB; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_STREAMS; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_TABLE; - } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, tListLen(pName->tname)) == 0) { - tableType = TSDB_MGMT_TABLE_VGROUP; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, tListLen(pName->tname)) == 0) { - // tableType = TSDB_MGMT_TABLE_DIST; - } else { - ASSERT(0); - } - - tNameAssign(&pInfo->name, pName); - pInfo->type = tableType; - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { - pInfo->readHandle = pSysTableReadHandle; - blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); - } else { - tsem_init(&pInfo->ready, 0, 0); - pInfo->epSet = epset; - -#if 1 - { // todo refactor - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "DB-META"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = qProcessFetchRsp; - rpcInit.sessions = tsMaxConnections; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.user = (char*)"root"; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.secret = (char*)"dcc5bed04851fec854c035b2e40263b6"; - - pInfo->pTransporter = rpcOpen(&rpcInit); - if (pInfo->pTransporter == NULL) { - return NULL; // todo - } - } -#endif - } - - pOperator->name = "SysTableScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->getNextFn = doSysTableScan; - pOperator->closeFn = destroySysTableScannerOperatorInfo; - pOperator->pTaskInfo = pTaskInfo; - - return pOperator; -} - -static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey); -static void cleanupAggSup(SAggSupporter* pAggSup); +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey); +static void cleanupAggSup(SAggSupporter* pAggSup); static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param; @@ -6877,60 +5874,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } -static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SGroupbyOperatorInfo* pInfo = pOperator->info; - if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, - pInfo->binfo.rowCellInfoOffset); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - } - return pInfo->binfo.pRes; - } - - int32_t order = TSDB_ORDER_ASC; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - break; - } - - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); - doHashGroupbyAgg(pOperator, pBlock); - } - - pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pInfo->binfo.resultRowInfo); - - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, - &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - // if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows - // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); - // } else { - // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, - // pInfo->binfo.rowCellInfoOffset); - // } - - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); - initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, - pInfo->binfo.rowCellInfoOffset); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - } - - return pInfo->binfo.pRes; -} static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) { @@ -7101,7 +6044,7 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { destroyDiskbasedBuf(pAggSup->pResultBuf); } -static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, +int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey) { pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; @@ -7182,7 +6125,7 @@ _error: return NULL; } -static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { +void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { assert(pInfo != NULL); destroySqlFunctionCtx(pInfo->pCtx, numOfOutput); @@ -7226,14 +6169,6 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(pInfo->p); } -void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { - SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; - doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - taosMemoryFreeClear(pInfo->keyBuf); - taosArrayDestroy(pInfo->pGroupCols); - taosArrayDestroy(pInfo->pGroupColVals); -} - static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -7259,16 +6194,6 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) { - SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; - tsem_destroy(&pInfo->ready); - blockDataDestroy(pInfo->pRes); - - if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { - metaCloseTbCursor(pInfo->pCur); - } -} - void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosArrayDestroy(pExInfo->pSources); @@ -7330,12 +6255,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pInfo->limit = *pLimit; pInfo->curOffset = pLimit->offset; pInfo->binfo.pRes = pResBlock; - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset); - if (pInfo->binfo.pCtx == NULL) { - goto _error; - } - // initResultRowInfo(&pBInfo->resultRowInfo, 8); - // setFunctionResultOutput(pBInfo, MAIN_SCAN); + + int32_t numOfCols = num; + int32_t numOfRows = 4096; + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; @@ -7565,77 +6489,6 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun return pOperator; } -static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) { - pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pGroupColVals == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pGroupColList, i); - pInfo->groupKeyLen += pCol->bytes; - - struct SGroupKeys key = {0}; - key.bytes = pCol->bytes; - key.type = pCol->type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pCol->bytes); - if (key.pData == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(pInfo->pGroupColVals, &key); - } - - int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; - pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize); - - if (pInfo->keyBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - return TSDB_CODE_SUCCESS; -} - -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo) { - SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - goto _error; - } - - pInfo->pGroupCols = pGroupColList; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - - int32_t code = initGroupOptrInfo(pInfo, pGroupColList); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - pOperator->name = "GroupbyAggOperator"; - pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - // pOperator->operatorType = OP_Groupby; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; - pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupbyOperatorInfo; - - code = appendDownstream(pOperator, &downstream, 1); - return pOperator; - -_error: - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - return NULL; -} - static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal); @@ -7959,10 +6812,11 @@ static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { pRes->info.rows = 0; SSDataBlock* pBlock = NULL; + SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pDownstream->getNextFn(pDownstream, newgroup); + publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); @@ -7972,6 +6826,7 @@ static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { doSetOperatorCompleted(pOperator); break; } + // ensure result output buf if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { int32_t newSize = pRes->info.rows + pBlock->info.rows; @@ -8013,29 +6868,27 @@ static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; } -SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput) { +SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo)); - pInfo->totalBytes = 0; - pInfo->buf = NULL; - pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold - pInfo->outputCapacity = 4096; - pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + pOperator->resultInfo.capacity = 4096; // todo extract function. + + pInfo->totalBytes = 0; + pInfo->buf = NULL; + + pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo)); pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - // pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "DistinctOperator"; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - // pOperator->operatorType = OP_Distinct; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = hashDistinct; - pOperator->pExpr = pExpr; - pOperator->closeFn = destroyDistinctOperatorInfo; + pOperator->name = "DistinctOperator"; + pOperator->blockingOptr = true; + pOperator->status = OP_NOT_OPENED; +// pOperator->operatorType = DISTINCT; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->getNextFn = hashDistinct; + pOperator->closeFn = destroyDistinctOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c new file mode 100644 index 0000000000000000000000000000000000000000..90cb97f6c7ce3517ea179c9f1a120e231ffb0881 --- /dev/null +++ b/source/libs/executor/src/groupoperator.c @@ -0,0 +1,332 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "function.h" +#include "tname.h" + +#include "tdatablock.h" +#include "tmsg.h" + +#include "executorimpl.h" +#include "tcompare.h" +#include "thash.h" +#include "ttypes.h" + +static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { + SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + taosMemoryFreeClear(pInfo->keyBuf); + taosArrayDestroy(pInfo->pGroupCols); + taosArrayDestroy(pInfo->pGroupColVals); +} + +static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) { + pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); + if (pInfo->pGroupColVals == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = taosArrayGet(pGroupColList, i); + pInfo->groupKeyLen += pCol->bytes; + + struct SGroupKeys key = {0}; + key.bytes = pCol->bytes; + key.type = pCol->type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, pCol->bytes); + if (key.pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pInfo->pGroupColVals, &key); + } + + int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; + pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize); + + if (pInfo->keyBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, + int32_t numOfGroupCols) { + SColumnDataAgg* pColAgg = NULL; + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + if (pBlock->pBlockAgg != NULL) { + pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + } + + bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); + + SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + if (pkey->isNull && isNull) { + continue; + } + + if (isNull || pkey->isNull) { + return false; + } + + char* val = colDataGetData(pColInfoData, rowIndex); + + if (IS_VAR_DATA_TYPE(pkey->type)) { + int32_t len = varDataLen(val); + if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) { + continue; + } else { + return false; + } + } else { + if (memcmp(pkey->pData, val, pkey->bytes) != 0) { + return false; + } + } + } + + return true; +} + +static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { + SColumnDataAgg* pColAgg = NULL; + + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + + if (pBlock->pBlockAgg != NULL) { + pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + } + + SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + pkey->isNull = true; + } else { + char* val = colDataGetData(pColInfoData, rowIndex); + if (IS_VAR_DATA_TYPE(pkey->type)) { + memcpy(pkey->pData, val, varDataTLen(val)); + } else { + memcpy(pkey->pData, val, pkey->bytes); + } + } + } +} + +static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) { + ASSERT(pKey != NULL); + size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); + + char* isNull = (char*)pKey; + char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); + if (pkey->isNull) { + isNull[i] = 1; + continue; + } + + isNull[i] = 0; + if (IS_VAR_DATA_TYPE(pkey->type)) { + varDataCopy(pStart, pkey->pData); + pStart += varDataTLen(pkey->pData); + ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); + } else { + memcpy(pStart, pkey->pData, pkey->bytes); + pStart += pkey->bytes; + } + } + + *length = (pStart - (char*)pKey); + return 0; +} + +// assign the group keys or user input constant values if required +static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) { + for (int32_t i = 0; i < numOfOutput; ++i) { + if (pCtx[i].functionId == -1) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]); + + SColumnInfoData* pColInfoData = pCtx[i].input.pData[0]; + if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) { + char* dest = GET_ROWCELL_INTERBUF(pEntryInfo); + char* data = colDataGetData(pColInfoData, rowIndex); + + // set result exists, todo refactor + memcpy(dest, data, pColInfoData->info.bytes); + pEntryInfo->hasResult = DATA_SET_FLAG; + pEntryInfo->numOfRes = 1; + } + } + } +} + +static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SGroupbyOperatorInfo* pInfo = pOperator->info; + + SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; + int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); + // if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv)); + // return; + // } + + int32_t len = 0; + STimeWindow w = TSWINDOW_INITIALIZER; + + int32_t num = 0; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + // Compare with the previous row of this column, and do not set the output buffer again if they are identical. + if (!pInfo->isInit) { + keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); + pInfo->isInit = true; + num++; + continue; + } + + bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols); + if (equal) { + num++; + continue; + } + + /*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + } + + int32_t rowIndex = j - num; + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + + // assign the group keys or user input constant values if required + doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); + num = 1; + } + + if (num > 0) { + /*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + int32_t ret = + setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, + 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + } + + int32_t rowIndex = pBlock->info.rows - num; + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + } +} + +static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SGroupbyOperatorInfo* pInfo = pOperator->info; + if (pOperator->status == OP_RES_TO_RETURN) { + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, + pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + return pInfo->binfo.pRes; + } + + int32_t order = TSDB_ORDER_ASC; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + while (1) { + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { + break; + } + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + doHashGroupbyAgg(pOperator, pBlock); + } + + pOperator->status = OP_RES_TO_RETURN; + closeAllResultRows(&pInfo->binfo.resultRowInfo); + + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + // if (!stableQuery) { // finalize include the update of result rows + // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + // } else { + // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, + // pInfo->binfo.rowCellInfoOffset); + // } + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); + initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, + pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + + return pInfo->binfo.pRes; +} + +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo) { + SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->pGroupCols = pGroupColList; + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + + int32_t code = initGroupOptrInfo(pInfo, pGroupColList); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pOperator->name = "GroupbyAggOperator"; + pOperator->blockingOptr = true; + pOperator->status = OP_NOT_OPENED; + // pOperator->operatorType = OP_Groupby; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = hashGroupbyAggregate; + pOperator->closeFn = destroyGroupbyOperatorInfo; + + code = appendDownstream(pOperator, &downstream, 1); + return pOperator; + + _error: + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; +} \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c new file mode 100644 index 0000000000000000000000000000000000000000..70fa6e499552fb8a5115b5da9b9309d3cc35339a --- /dev/null +++ b/source/libs/executor/src/scanoperator.c @@ -0,0 +1,865 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tglobal.h" +#include "filter.h" +#include "function.h" +#include "os.h" +#include "querynodes.h" +#include "tname.h" +#include "vnode.h" + +#include "tdatablock.h" +#include "tmsg.h" + +#include "executorimpl.h" +#include "query.h" +#include "tcompare.h" +#include "thash.h" +#include "tsdb.h" +#include "ttypes.h" + +#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) +#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) + +void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { + for (int32_t i = 0; i < numOfOutput; ++i) { + SWITCH_ORDER(pCtx[i].order); + } +} + +static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { +#if 0 + int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); + for(int32_t i = 0; i < numOfGroups; ++i) { + SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); + SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); + + size_t t = taosArrayGetSize(group); + for (int32_t j = 0; j < t; ++j) { + STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); + updateTableQueryInfoForReverseScan(pCheckInfo); + + // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide + // the start check timestamp of tsdbQueryHandle +// STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j); +// pTableKeyInfo->lastKey = pCheckInfo->lastKey; +// +// assert(pCheckInfo->pTable == pTableKeyInfo->pTable); + } + } +#endif +} + +int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { + STaskCostInfo* pCost = &pTaskInfo->cost; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + + *status = BLK_DATA_ALL_NEEDED; + + SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); + if (pCols == NULL) { + return terrno; + } + + int32_t numOfCols = pBlock->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + + ASSERT(pColMatchInfo->colId == p->info.colId); + taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); + } + + if (pTableScanInfo->pFilterNode != NULL) { + SFilterInfo* filter = NULL; + int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0); + + SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; + code = filterSetDataFromSlotId(filter, ¶m1); + + int8_t* rowRes = NULL; + bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); + + SSDataBlock* px = createOneDataBlock(pBlock); + blockDataEnsureCapacity(px, pBlock->info.rows); + + int32_t numOfRow = 0; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); + + numOfRow = 0; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + if (rowRes[j] == 0) { + continue; + } + + colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); + numOfRow += 1; + } + *pSrc = *pDst; + } + + pBlock->info.rows = numOfRow; + } + + return TSDB_CODE_SUCCESS; +} + +static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { + // reverse order time range + SET_REVERSE_SCAN_FLAG(pTableScanInfo); + + switchCtxOrder(pCtx, numOfOutput); + SWITCH_ORDER(pTableScanInfo->order); + setupQueryRangeForReverseScan(pTableScanInfo); + + pTableScanInfo->times = 1; + pTableScanInfo->current = 0; + pTableScanInfo->reverseTimes = 0; +} + +static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { + STableScanInfo* pTableScanInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SSDataBlock* pBlock = &pTableScanInfo->block; + STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo; + + *newgroup = false; + + while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + pTableScanInfo->numOfBlocks += 1; + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); + + // todo opt + // if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { + // STableQueryInfo** pTableQueryInfo = + // (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.uid, sizeof(pBlock->info.uid)); + // if (pTableQueryInfo == NULL) { + // break; + // } + // + // doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order); + // } + + // this function never returns error? + uint32_t status = BLK_DATA_ALL_NEEDED; + int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status); + // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pOperator->pTaskInfo->env, code); + } + + // current block is ignored according to filter result by block statistics data, continue load the next block + if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) { + continue; + } + + return pBlock; + } + + return NULL; +} + +static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { + STableScanInfo* pTableScanInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + // The read handle is not initialized yet, since no qualified tables exists + if (pTableScanInfo->dataReader == NULL) { + return NULL; + } + + SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; + *newgroup = false; + + while (pTableScanInfo->current < pTableScanInfo->times) { + SSDataBlock* p = doTableScanImpl(pOperator, newgroup); + if (p != NULL) { + return p; + } + + if (++pTableScanInfo->current >= pTableScanInfo->times) { + if (pTableScanInfo->reverseTimes <= 0 /* || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)*/) { + return NULL; + } else { + break; + } + } + + // do prepare for the next round table scan operation + // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); + // tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); + + setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTableScanInfo->scanFlag = REPEAT_SCAN; + + if (pResultRowInfo->size > 0) { + pResultRowInfo->curPos = 0; + } + + qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); + } + + SSDataBlock* p = NULL; + // todo refactor + if (pTableScanInfo->reverseTimes > 0) { + setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); + // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); + // tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); + + qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); + + if (pResultRowInfo->size > 0) { + pResultRowInfo->curPos = pResultRowInfo->size - 1; + } + + p = doTableScanImpl(pOperator, newgroup); + } + + return p; +} + +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, + int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, + SNode* pCondition, SExecTaskInfo* pTaskInfo) { + assert(repeatTime > 0); + + STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for (int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData idata = {0}; + taosArrayPush(pInfo->block.pDataBlock, &idata); + } + + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pTsdbReadHandle; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; + pOperator->name = "TableScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; + pOperator->getNextFn = doTableScan; + pOperator->pTaskInfo = pTaskInfo; + + return pOperator; +} + +SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { + STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); + + pInfo->dataReader = pTsdbReadHandle; + pInfo->times = 1; + pInfo->reverseTimes = 0; + pInfo->order = pRuntimeEnv->pQueryAttr->order.order; + pInfo->current = 0; + pInfo->prevGroupId = -1; + pRuntimeEnv->enableGroupData = true; + + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + pOperator->name = "TableSeqScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->getNextFn = doTableScanImpl; + + return pOperator; +} + +static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + STableScanInfo* pTableScanInfo = pOperator->info; + *newgroup = false; + + STableBlockDistInfo tableBlockDist = {0}; + tableBlockDist.numOfTables = 1; // TODO set the correct number of tables + + int32_t numRowSteps = TSDB_DEFAULT_MAX_ROW_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS; + if (TSDB_DEFAULT_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) { + ++numRowSteps; + } + + tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); + taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps); + + tableBlockDist.maxRows = INT_MIN; + tableBlockDist.minRows = INT_MAX; + + tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist); + tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); + + SSDataBlock* pBlock = &pTableScanInfo->block; + pBlock->info.rows = 1; + pBlock->info.numOfCols = 1; + +// SBufferWriter bw = tbufInitWriter(NULL, false); +// blockDistInfoToBinary(&tableBlockDist, &bw); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + +// int32_t len = (int32_t) tbufTell(&bw); +// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); +// *(int32_t*) pColInfo->pData = len; +// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); +// +// tbufCloseWriter(&bw); + +// SArray* g = GET_TABLEGROUP(pOperator->, 0); +// pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0); + + pOperator->status = OP_EXEC_DONE; + return pBlock; +} + +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { + STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + pInfo->dataReader = dataReader; + pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_BINARY; + infoData.info.bytes = 1024; + infoData.info.colId = 0; + taosArrayPush(pInfo->block.pDataBlock, &infoData); + + pOperator->name = "DataBlockInfoScanOperator"; + // pOperator->operatorType = OP_TableBlockInfoScan; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doBlockInfoScan; + + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + return pOperator; + + _error: + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; +} + +static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + + pInfo->validBlockIndex = 0; + for (int32_t i = 0; i < total; ++i) { + SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i); + blockDataDestroy(p); + } + taosArrayClear(pInfo->pBlockLists); +} + +static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) { + // NOTE: this operator does never check if current status is done or not + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamBlockScanInfo* pInfo = pOperator->info; + + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + + if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + if (pInfo->validBlockIndex >= total) { + doClearBufferedBlocks(pInfo); + return NULL; + } + + int32_t current = pInfo->validBlockIndex++; + return taosArrayGetP(pInfo->pBlockLists, current); + } else { + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + blockDataCleanup(pInfo->pRes); + + while (tqNextDataBlock(pInfo->readerHandle)) { + pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + terrno = pTaskInfo->code; + return NULL; + } + + if (pBlockInfo->rows == 0) { + return NULL; + } + + SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); + + int32_t numOfCols = pInfo->pRes->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + + ASSERT(pColMatchInfo->colId == p->info.colId); + taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p); + } + + if (pInfo->pRes->pDataBlock == NULL) { + // TODO add log + pTaskInfo->code = terrno; + return NULL; + } + + break; + } + + // record the scan action. + pInfo->numOfExec++; + pInfo->numOfRows += pBlockInfo->rows; + + return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; + } +} + +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { + SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + int32_t numOfOutput = taosArrayGetSize(pColList); + + SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); + for(int32_t i = 0; i < numOfOutput; ++i) { + int16_t* id = taosArrayGet(pColList, i); + taosArrayPush(pColIds, id); + } + + pInfo->pColMatchInfo = pColList; + + // set the extract column id to streamHandle + tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds); + int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); + if (code != 0) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; + } + + pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); + if (pInfo->pBlockLists == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; + } + + pInfo->readerHandle = streamReadHandle; + pInfo->pRes = pResBlock; + + pOperator->name = "StreamBlockScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doStreamBlockScan; + pOperator->closeFn = operatorDummyCloseFn; + pOperator->pTaskInfo = pTaskInfo; + + return pOperator; +} + +static void destroySysScanOperator(void* param, int32_t numOfOutput) { + SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; + tsem_destroy(&pInfo->ready); + blockDataDestroy(pInfo->pRes); + + if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + metaCloseTbCursor(pInfo->pCur); + } +} + +EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { + int32_t code = TSDB_CODE_SUCCESS; + ENodeType nType = nodeType(pNode); + + switch (nType) { + case QUERY_NODE_OPERATOR: { + SOperatorNode* node = (SOperatorNode*)pNode; + + if (OP_TYPE_EQUAL == node->opType) { + *(int32_t*)pContext = 1; + return DEAL_RES_CONTINUE; + } + + *(int32_t*)pContext = 0; + + return DEAL_RES_IGNORE_CHILD; + } + case QUERY_NODE_COLUMN: { + if (1 != *(int32_t*)pContext) { + return DEAL_RES_CONTINUE; + } + + SColumnNode* node = (SColumnNode*)pNode; + if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) { + *(int32_t*)pContext = 2; + return DEAL_RES_CONTINUE; + } + + *(int32_t*)pContext = 0; + return DEAL_RES_CONTINUE; + } + case QUERY_NODE_VALUE: { + if (2 != *(int32_t*)pContext) { + return DEAL_RES_CONTINUE; + } + + SValueNode* node = (SValueNode*)pNode; + char* dbName = nodesGetValueFromNode(node); + strncpy(pContext, varDataVal(dbName), varDataLen(dbName)); + *((char*)pContext + varDataLen(dbName)) = 0; + return DEAL_RES_ERROR; // stop walk + } + default: + break; + } + + return DEAL_RES_CONTINUE; +} + +void getDBNameFromCondition(SNode* pCondition, char* dbName) { + if (NULL == pCondition) { + return; + } + + nodesWalkNode(pCondition, getDBNameFromConditionWalker, dbName); +} + +static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) { + SOperatorInfo* operator=(SOperatorInfo*) param; + SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info; + if (TSDB_CODE_SUCCESS == code) { + pScanResInfo->pRsp = pMsg->pData; + + SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp; + pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->handle = htobe64(pRsp->handle); + pRsp->compLen = htonl(pRsp->compLen); + } else { + operator->pTaskInfo->code = code; + } + + tsem_post(&pScanResInfo->ready); +} + +static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { + if (pInfo->pCondition == NULL) { + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; + } + + SFilterInfo* filter = NULL; + int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0); + + SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock}; + code = filterSetDataFromSlotId(filter, ¶m1); + + int8_t* rowRes = NULL; + bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + + SSDataBlock* px = createOneDataBlock(pInfo->pRes); + blockDataEnsureCapacity(px, pInfo->pRes->info.rows); + + // TODO refactor + int32_t numOfRow = 0; + for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { + SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); + + numOfRow = 0; + for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { + if (rowRes[j] == 0) { + continue; + } + + colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); + numOfRow += 1; + } + } + + px->info.rows = numOfRow; + pInfo->pRes = px; + + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; +} + +static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { + // build message and send to mnode to fetch the content of system tables. + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSysTableScanInfo* pInfo = pOperator->info; + + // retrieve local table list info from vnode + if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + if (pInfo->pCur == NULL) { + pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); + } + + blockDataCleanup(pInfo->pRes); + + int32_t tableNameSlotId = 1; + SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, tableNameSlotId); + + char* name = NULL; + int32_t numOfRows = 0; + + char n[TSDB_TABLE_NAME_LEN] = {0}; + while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { + STR_TO_VARSTR(n, name); + colDataAppend(pTableNameCol, numOfRows, n, false); + numOfRows += 1; + if (numOfRows >= pInfo->capacity) { + break; + } + + for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { + if (i == tableNameSlotId) { + continue; + } + + SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); + int64_t tmp = 0; + char t[10] = {0}; + STR_TO_VARSTR(t, "_"); //TODO + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + colDataAppend(pColInfoData, numOfRows, t, false); + } else { + colDataAppend(pColInfoData, numOfRows, (char*)&tmp, false); + } + } + } + + pInfo->loadInfo.totalRows += numOfRows; + pInfo->pRes->info.rows = numOfRows; + + // pInfo->elapsedTime; + // pInfo->totalBytes; + return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; + } else { // load the meta from mnode of the given epset + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + int64_t startTs = taosGetTimestampUs(); + + pInfo->req.type = pInfo->type; + strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); + if (pInfo->showRewrite) { + char dbName[TSDB_DB_NAME_LEN] = {0}; + getDBNameFromCondition(pInfo->pCondition, dbName); + sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); + } + + int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); + char* buf1 = taosMemoryCalloc(1, contLen); + tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pMsgSendInfo->param = pOperator; + pMsgSendInfo->msgInfo.pData = buf1; + pMsgSendInfo->msgInfo.len = contLen; + pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; + pMsgSendInfo->fp = loadSysTableContentCb; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); + tsem_wait(&pInfo->ready); + + if (pTaskInfo->code) { + return NULL; + } + + SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; + pInfo->req.showId = pRsp->handle; + + if (pRsp->numOfRows == 0 || pRsp->completed) { + pOperator->status = OP_EXEC_DONE; + } + + if (pRsp->numOfRows == 0) { + // qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" + // try next", + // GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, + // pDataInfo->totalRows, pExchangeInfo->totalRows); + return NULL; + } + + SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; + setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, + pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); + + return doFilterResult(pInfo); + } + + return NULL; +} + +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, + SNode* pCondition, SEpSet epset, SArray* colList, + SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) { + SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pInfo->accountId = accountId; + pInfo->showRewrite = showRewrite; + pInfo->pRes = pResBlock; + pInfo->capacity = 4096; + pInfo->pCondition = pCondition; + pInfo->scanCols = colList; + + // TODO remove it + int32_t tableType = 0; + const char* name = tNameGetTableName(pName); + if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_DB; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_USER; + } else if (strncasecmp(name, TSDB_INS_TABLE_DNODES, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_DNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_MNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_MODULE; + } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_QNODE; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_FUNC; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, tListLen(pName->tname)) == 0) { + // tableType = TSDB_MGMT_TABLE_INDEX; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_STB; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_STREAMS; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_TABLE; + } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, tListLen(pName->tname)) == 0) { + tableType = TSDB_MGMT_TABLE_VGROUP; + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, tListLen(pName->tname)) == 0) { + // tableType = TSDB_MGMT_TABLE_DIST; + } else { + ASSERT(0); + } + + tNameAssign(&pInfo->name, pName); + pInfo->type = tableType; + if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { + pInfo->readHandle = pSysTableReadHandle; + blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); + } else { + tsem_init(&pInfo->ready, 0, 0); + pInfo->epSet = epset; + +#if 1 + { // todo refactor + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "DB-META"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = qProcessFetchRsp; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char*)"root"; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.secret = (char*)"dcc5bed04851fec854c035b2e40263b6"; + + pInfo->pTransporter = rpcOpen(&rpcInit); + if (pInfo->pTransporter == NULL) { + return NULL; // todo + } + } +#endif + } + + pOperator->name = "SysTableScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->getNextFn = doSysTableScan; + pOperator->closeFn = destroySysScanOperator; + pOperator->pTaskInfo = pTaskInfo; + + return pOperator; +} diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 045bc9551525f680bb44c46743e0d21e625081f1..607bd279c15137550c5f3d738d463670b910e8fd 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -26,32 +26,34 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); void functionFinalize(SqlFunctionCtx *pCtx); bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -void countFunction(SqlFunctionCtx *pCtx); +int32_t countFunction(SqlFunctionCtx *pCtx); bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -void sumFunction(SqlFunctionCtx *pCtx); +int32_t sumFunction(SqlFunctionCtx *pCtx); bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -void minFunction(SqlFunctionCtx* pCtx); -void maxFunction(SqlFunctionCtx *pCtx); +int32_t minFunction(SqlFunctionCtx* pCtx); +int32_t maxFunction(SqlFunctionCtx *pCtx); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -void stddevFunction(SqlFunctionCtx* pCtx); +int32_t stddevFunction(SqlFunctionCtx* pCtx); void stddevFinalize(SqlFunctionCtx* pCtx); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -void percentileFunction(SqlFunctionCtx *pCtx); +int32_t percentileFunction(SqlFunctionCtx *pCtx); void percentileFinalize(SqlFunctionCtx* pCtx); -bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -void firstFunction(SqlFunctionCtx *pCtx); -void lastFunction(SqlFunctionCtx *pCtx); +bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); +int32_t diffFunction(SqlFunctionCtx *pCtx); -void valFunction(SqlFunctionCtx *pCtx); +bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t firstFunction(SqlFunctionCtx *pCtx); +int32_t lastFunction(SqlFunctionCtx *pCtx); #ifdef __cplusplus } diff --git a/source/libs/function/inc/taggfunction.h b/source/libs/function/inc/taggfunction.h index 906d4f63fb72470a5f4c928ad2fac64263b673bf..0697d309ac4a64948d3e41f09aab9e3ab18dbbe2 100644 --- a/source/libs/function/inc/taggfunction.h +++ b/source/libs/function/inc/taggfunction.h @@ -52,8 +52,6 @@ typedef struct SInterpInfoDetail { int8_t primaryCol; } SInterpInfoDetail; -#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo))) - typedef struct STwaInfo { int8_t hasResult; // flag to denote has value double dOutput; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a315c6c8a2b40bf7eebe27fdb943a250d2a5818b..755bbad4ee6d1b80ca6c1b5944486722aeffc2db 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -63,74 +63,74 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = functionFinalize }, { - .name = "stddev", - .type = FUNCTION_TYPE_STDDEV, - .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getStddevFuncEnv, - .initFunc = stddevFunctionSetup, - .processFunc = stddevFunction, - .finalizeFunc = stddevFinalize - }, - { - .name = "percentile", - .type = FUNCTION_TYPE_PERCENTILE, - .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getPercentileFuncEnv, - .initFunc = percentileFunctionSetup, - .processFunc = percentileFunction, - .finalizeFunc = functionFinalize - }, - { - .name = "apercentile", - .type = FUNCTION_TYPE_APERCENTILE, - .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize - }, - { - .name = "top", - .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize - }, - { - .name = "bottom", - .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize - }, - { - .name = "spread", - .type = FUNCTION_TYPE_SPREAD, - .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize - }, - { - .name = "last_row", - .type = FUNCTION_TYPE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .name = "stddev", + .type = FUNCTION_TYPE_STDDEV, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getStddevFuncEnv, + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunction, + .finalizeFunc = stddevFinalize + }, + { + .name = "percentile", + .type = FUNCTION_TYPE_PERCENTILE, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getPercentileFuncEnv, + .initFunc = percentileFunctionSetup, + .processFunc = percentileFunction, + .finalizeFunc = percentileFinalize + }, + { + .name = "apercentile", + .type = FUNCTION_TYPE_APERCENTILE, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "top", + .type = FUNCTION_TYPE_TOP, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "bottom", + .type = FUNCTION_TYPE_BOTTOM, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "spread", + .type = FUNCTION_TYPE_SPREAD, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize + }, + { + .name = "last_row", + .type = FUNCTION_TYPE_LAST_ROW, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getMinmaxFuncEnv, + .initFunc = maxFunctionSetup, + .processFunc = maxFunction, + .finalizeFunc = functionFinalize }, { .name = "first", @@ -152,6 +152,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = lastFunction, .finalizeFunc = functionFinalize }, + { + .name = "diff", + .type = FUNCTION_TYPE_DIFF, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getDiffFuncEnv, + .initFunc = diffFunctionSetup, + .processFunc = diffFunction, + .finalizeFunc = functionFinalize + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, @@ -379,9 +389,11 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { switch(pFunc->funcType) { case FUNCTION_TYPE_WDURATION: - case FUNCTION_TYPE_COUNT: + case FUNCTION_TYPE_COUNT: { pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; break; + } + case FUNCTION_TYPE_SUM: { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); int32_t paraType = pParam->node.resType.type; @@ -400,6 +412,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType }; break; } + + case FUNCTION_TYPE_DIFF: case FUNCTION_TYPE_FIRST: case FUNCTION_TYPE_LAST: case FUNCTION_TYPE_MIN: @@ -409,6 +423,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType }; break; } + case FUNCTION_TYPE_CONCAT: case FUNCTION_TYPE_ROWTS: case FUNCTION_TYPE_TBNAME: { @@ -416,10 +431,10 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } - case FUNCTION_TYPE_QENDTS: case FUNCTION_TYPE_QSTARTTS: - case FUNCTION_TYPE_WENDTS: - case FUNCTION_TYPE_WSTARTTS: { + case FUNCTION_TYPE_QENDTS: + case FUNCTION_TYPE_WSTARTTS: + case FUNCTION_TYPE_WENDTS:{ pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; break; } @@ -448,6 +463,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE }; break; } + case FUNCTION_TYPE_NOW: // todo break; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index a1c5dbce7b5eb033a1ac549af2c26d1713580897..10f23d586614e395d42f902b7822afd534d09f7d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -65,7 +65,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { * count function does need the finalize, if data is missing, the default value, which is 0, is used * count function does not use the pCtx->interResBuf to keep the intermediate buffer */ -void countFunction(SqlFunctionCtx *pCtx) { +int32_t countFunction(SqlFunctionCtx *pCtx) { int32_t numOfElem = 0; /* @@ -111,7 +111,7 @@ void countFunction(SqlFunctionCtx *pCtx) { } \ } while (0) -void sumFunction(SqlFunctionCtx *pCtx) { +int32_t sumFunction(SqlFunctionCtx *pCtx) { int32_t numOfElem = 0; // Only the pre-computing information loaded and actual data does not loaded @@ -432,12 +432,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { return numOfElems; } -void minFunction(SqlFunctionCtx *pCtx) { +int32_t minFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); } -void maxFunction(SqlFunctionCtx *pCtx) { +int32_t maxFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 0); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); } @@ -475,7 +475,7 @@ bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) return true; } -void stddevFunction(SqlFunctionCtx* pCtx) { +int32_t stddevFunction(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; // Only the pre-computing information loaded and actual data does not loaded @@ -627,7 +627,7 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI return true; } -void percentileFunction(SqlFunctionCtx *pCtx) { +int32_t percentileFunction(SqlFunctionCtx *pCtx) { int32_t notNullElems = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); @@ -644,7 +644,7 @@ void percentileFunction(SqlFunctionCtx *pCtx) { // all data are null, set it completed if (pInfo->numOfElems == 0) { pResInfo->complete = true; - return; + return 0; } else { pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); } @@ -698,7 +698,7 @@ void percentileFunction(SqlFunctionCtx *pCtx) { } } - return; + return 0; } // the second stage, calculate the true percentile value @@ -718,6 +718,7 @@ void percentileFunction(SqlFunctionCtx *pCtx) { pResInfo->hasResult = DATA_SET_FLAG; } +// TODO set the correct parameter. void percentileFinalize(SqlFunctionCtx* pCtx) { double v = 50;//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; @@ -741,9 +742,9 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { // TODO fix this // This ordinary first function only handle the data block in ascending order -void firstFunction(SqlFunctionCtx *pCtx) { +int32_t firstFunction(SqlFunctionCtx *pCtx) { if (pCtx->order == TSDB_ORDER_DESC) { - return; + return 0; } int32_t numOfElems = 0; @@ -757,7 +758,7 @@ void firstFunction(SqlFunctionCtx *pCtx) { // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); - return; + return 0; } // Check for the first not null data @@ -784,9 +785,9 @@ void firstFunction(SqlFunctionCtx *pCtx) { SET_VAL(pResInfo, numOfElems, 1); } -void lastFunction(SqlFunctionCtx *pCtx) { +int32_t lastFunction(SqlFunctionCtx *pCtx) { if (pCtx->order != TSDB_ORDER_DESC) { - return; + return 0; } int32_t numOfElems = 0; @@ -795,13 +796,12 @@ void lastFunction(SqlFunctionCtx *pCtx) { char* buf = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; // All null data column, return directly. if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { ASSERT(pInputCol->hasNull == true); - return; + return 0; } if (pCtx->order == TSDB_ORDER_DESC) { @@ -846,10 +846,225 @@ void lastFunction(SqlFunctionCtx *pCtx) { SET_VAL(pResInfo, numOfElems, 1); } -void valFunction(SqlFunctionCtx *pCtx) { +typedef struct SDiffInfo { + bool valueAssigned; + bool ignoreNegative; + union { int64_t i64; double d64;} prev; +} SDiffInfo; + +bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SDiffInfo); + return true; +} + +bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { + if (!functionSetup(pCtx, pResInfo)) { + return false; + } + + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + pDiffInfo->valueAssigned = false; + pDiffInfo->prev.i64 = 0; + pDiffInfo->ignoreNegative = false; // TODO set correct param + return true; +} + +int32_t diffFunction(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SDiffInfo *pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + bool isFirstBlock = (pDiffInfo->valueAssigned == false); + int32_t numOfElems = 0; + + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); +// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; + + TSKEY* pTimestamp = pCtx->ptsOutputBuf; + TSKEY* tsList = GET_TS_LIST(pCtx); + + switch (pInputCol->info.type) { + case TSDB_DATA_TYPE_INT: { + int32_t *pOutput = (int32_t *)pCtx->pOutput; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + + int32_t v = *(int32_t*) colDataGetData(pInputCol, i); + if (pDiffInfo->valueAssigned) { + int64_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null + if (pDiffInfo->ignoreNegative) { + continue; + } + + *(pOutput++) = delta; +// *pTimestamp = (tsList != NULL)? tsList[i]:0; + pTimestamp += 1; + } + + pDiffInfo->prev.i64 = v; + pDiffInfo->valueAssigned = true; + numOfElems++; + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *pOutput = (int64_t *)pCtx->pOutput; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + + int32_t v = 0; + if (pDiffInfo->valueAssigned) { + v = *(int64_t*) colDataGetData(pInputCol, i); + int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null + if (pDiffInfo->ignoreNegative) { + continue; + } + + *(pOutput++) = delta; + *pTimestamp = (tsList != NULL)? tsList[i]:0; + + pOutput += 1; + pTimestamp += 1; + } + + pDiffInfo->prev.i64 = v; + pDiffInfo->valueAssigned = true; + numOfElems++; + } + break; + } +#if 0 + case TSDB_DATA_TYPE_DOUBLE: { + double *pData = (double *)data; + double *pOutput = (double *)pCtx->pOutput; + + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { + continue; + } + if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { + continue; + } - SColumnInfoData* pInputCol = pCtx->input.pData[0]; - memcpy(buf, pInputCol->pData, pInputCol->info.bytes); + if (pDiffInfo->valueAssigned) { // initial value is not set yet + SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null + *pTimestamp = (tsList != NULL)? tsList[i]:0; + pOutput += 1; + pTimestamp += 1; + } + + pDiffInfo->d64Prev = pData[i]; + pDiffInfo->valueAssigned = true; + numOfElems++; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *pData = (float *)data; + float *pOutput = (float *)pCtx->pOutput; + + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { + continue; + } + if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { + continue; + } + + if (pDiffInfo->valueAssigned) { // initial value is not set yet + *pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null + *pTimestamp = (tsList != NULL)? tsList[i]:0; + pOutput += 1; + pTimestamp += 1; + } + + pDiffInfo->d64Prev = pData[i]; + pDiffInfo->valueAssigned = true; + numOfElems++; + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *pData = (int16_t *)data; + int16_t *pOutput = (int16_t *)pCtx->pOutput; + + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { + continue; + } + if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { + continue; + } + + if (pDiffInfo->valueAssigned) { // initial value is not set yet + *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null + *pTimestamp = (tsList != NULL)? tsList[i]:0; + pOutput += 1; + pTimestamp += 1; + } + + pDiffInfo->i64Prev = pData[i]; + pDiffInfo->valueAssigned = true; + numOfElems++; + } + break; + } + + case TSDB_DATA_TYPE_TINYINT: { + int8_t *pData = (int8_t *)data; + int8_t *pOutput = (int8_t *)pCtx->pOutput; + + for (; i < pCtx->size && i >= 0; i += step) { + if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) { + continue; + } + if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { + continue; + } + + if (pDiffInfo->valueAssigned) { // initial value is not set yet + *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null + *pTimestamp = (tsList != NULL)? tsList[i]:0; + pOutput += 1; + pTimestamp += 1; + } + + pDiffInfo->i64Prev = pData[i]; + pDiffInfo->valueAssigned = true; + numOfElems++; + } + break; + } +#endif + default: + break; +// qError("error input type"); + } + + // initial value is not set yet + if (!pDiffInfo->valueAssigned || numOfElems <= 0) { + /* + * 1. current block and blocks before are full of null + * 2. current block may be null value + */ + assert(pCtx->hasNull); + } else { +// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) { +// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t]; +// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) { +// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx); +// } +// } + + int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems; + return forwardStep; +// pResInfo->numOfRes += forwardStep; + } } + diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 4034a0eb0fb6536888161b9b7fc5ebc57736df20..7296805421ee23619fd26e04578dd2c07097e711 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -112,6 +112,11 @@ bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); } +bool fmIsNonstandardSQLFunc(int32_t funcId) { + return isSpecificClassifyFunc(funcId, FUNC_MGT_NONSTANDARD_SQL_FUNC); +} + + void fmFuncMgtDestroy() { void* m = gFunMgtService.pFuncNameHashTable; if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {