diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 65ddc180d66e6ab8c56aec68ce2fb5064c104846..60c7b18367ea9bc90c441ab005f85931106aecf3 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -78,7 +78,6 @@ enum { MAIN_SCAN = 0x0u, REVERSE_SCAN = 0x1u, // todo remove it REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan - MERGE_STAGE = 0x20u, }; typedef struct SPoint1 { @@ -156,11 +155,6 @@ typedef struct SqlFunctionCtx { char udfName[TSDB_FUNC_NAME_LEN]; } SqlFunctionCtx; -enum { - TEXPR_BINARYEXPR_NODE = 0x1, - TEXPR_UNARYEXPR_NODE = 0x2, -}; - typedef struct tExprNode { int32_t nodeType; union { @@ -182,8 +176,9 @@ struct SScalarParam { SColumnInfoData *columnData; SHashObj *pHashFilter; int32_t hashValueType; - void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value + void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value int32_t numOfRows; + int32_t numOfQualified; // number of qualified elements in the final results }; void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell); @@ -201,8 +196,6 @@ int32_t taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // udf api -struct SUdfInfo; - /** * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf * @return error code @@ -226,6 +219,7 @@ int32_t udfStartUdfd(int32_t startDnodeId); * @return */ int32_t udfStopUdfd(); + #ifdef __cplusplus } #endif diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index 1f1d9dea933affc01394111f5d5c3c4082b32cfb..e02b4a617253c7ade77a922568b478eaf62e0d9a 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -31,13 +31,17 @@ enum { FLT_OPTION_NEED_UNIQE = 4, }; +#define FILTER_RESULT_ALL_QUALIFIED 0x1 +#define FILTER_RESULT_NONE_QUALIFIED 0x2 +#define FILTER_RESULT_PARTIAL_QUALIFIED 0x3 + typedef struct SFilterColumnParam { int32_t numOfCols; SArray *pDataBlock; } SFilterColumnParam; extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options); -extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols); +extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* pFilterResStatus); extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param); extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 14dbfbd63e9019ee06fd2b70e412fb4dad277f5b..5c1c1454583782fef6d035345e00914469610eca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -193,6 +193,8 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); +static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); +static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -893,7 +895,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn if (pData->cid < pColData->info.colId) { colIndex += 1; } else if (pData->cid == pColData->info.colId) { - if (pData->flag == HAS_NONE || pData->flag == HAS_NULL) { + if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL|HAS_NONE)) { colDataAppendNNULL(pColData, 0, remain); } else { if (IS_NUMERIC_TYPE(pColData->info.type) && asc) { @@ -1537,7 +1539,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* minKey = k.ts; } - if (minKey > key && pBlockData->nRow > 0) { + if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { minKey = key; } } else { @@ -1550,7 +1552,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* minKey = k.ts; } - if (minKey < key && pBlockData->nRow > 0) { + if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { minKey = key; } } @@ -1688,7 +1690,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - if (pBlockData->nRow > 0) { + if (hasDataInFileBlock(pBlockData, pDumpInfo)) { // no last block available, only data block exists if (!hasDataInLastBlock(pLastBlockReader)) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); @@ -1753,7 +1755,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsLast = getCurrentKeyInLastBlock(pLastBlockReader); } - int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); @@ -1769,7 +1771,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = ik.ts; } - if (minKey > key && pBlockData->nRow > 0) { + if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { minKey = key; } @@ -1786,7 +1788,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = ik.ts; } - if (minKey < key && pBlockData->nRow > 0) { + if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { minKey = key; } @@ -2021,6 +2023,13 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } +bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { + if (pBlockData->nRow > 0) { + ASSERT(pBlockData->nRow == pDumpInfo->totalRows); + } + + return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); +} int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { @@ -2052,7 +2061,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; + int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); } else { @@ -3290,9 +3299,31 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } + +static int32_t doOpenReaderImpl(STsdbReader* pReader) { + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + + initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + + // no data in files, let's try buffer in memory + if (pReader->status.fileIter.numOfFiles == 0) { + pReader->status.loadFromFile = false; + return TSDB_CODE_SUCCESS; + } else { + return initForFirstBlockInFile(pReader, pBlockIter); + } +} + // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { + STimeWindow window = pCond->twindows; + if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { + pCond->twindows.skey += 1; + pCond->twindows.ekey -= 1; + } + int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -3300,21 +3331,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl // check for query time window STsdbReader* pReader = *ppReader; - if (isEmptyQueryTimeWindow(&pReader->window)) { + if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) { tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); return TSDB_CODE_SUCCESS; } if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { // update the SQueryTableDataCond to create inner reader - STimeWindow w = pCond->twindows; - int32_t order = pCond->order; + int32_t order = pCond->order; if (order == TSDB_ORDER_ASC) { - pCond->twindows.ekey = pCond->twindows.skey; + pCond->twindows.ekey = window.skey; pCond->twindows.skey = INT64_MIN; pCond->order = TSDB_ORDER_DESC; } else { - pCond->twindows.skey = pCond->twindows.ekey; + pCond->twindows.skey = window.ekey; pCond->twindows.ekey = INT64_MAX; pCond->order = TSDB_ORDER_ASC; } @@ -3326,12 +3356,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } if (order == TSDB_ORDER_ASC) { - pCond->twindows.skey = w.ekey; + pCond->twindows.skey = window.ekey; pCond->twindows.ekey = INT64_MAX; } else { pCond->twindows.skey = INT64_MIN; - pCond->twindows.ekey = w.ekey; + pCond->twindows.ekey = window.ekey; } + pCond->order = order; + code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -3340,20 +3372,22 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here. if (pCond->suid != 0) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); if (pReader->pSchema == NULL) { tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr); } } else if (taosArrayGetSize(pTableList) > 0) { STableKeyInfo* pKey = taosArrayGet(pTableList, 0); - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1); if (pReader->pSchema == NULL) { tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr); } } + STsdbReader* p = pReader->innerReader[0] != NULL? pReader->innerReader[0]:pReader; + int32_t numOfTables = taosArrayGetSize(pTableList); - pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables); + pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables); if (pReader->status.pTableMap == NULL) { tsdbReaderClose(pReader); *ppReader = NULL; @@ -3368,40 +3402,36 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { - SDataBlockIter* pBlockIter = &pReader->status.blockIter; - - initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order); - - // no data in files, let's try buffer in memory - if (pReader->status.fileIter.numOfFiles == 0) { - pReader->status.loadFromFile = false; - } else { - code = initForFirstBlockInFile(pReader, pBlockIter); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; } } else { - STsdbReader* pPrevReader = pReader->innerReader[0]; - SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; + STsdbReader* pPrevReader = pReader->innerReader[0]; + STsdbReader* pNextReader = pReader->innerReader[1]; - code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap, pReader->idStr); + // we need only one row + pPrevReader->capacity = 1; + pPrevReader->status.pTableMap = pReader->status.pTableMap; + pPrevReader->pReadSnap = pReader->pReadSnap; + + pNextReader->capacity = 1; + pNextReader->status.pTableMap = pReader->status.pTableMap; + pNextReader->pReadSnap = pReader->pReadSnap; + + code = doOpenReaderImpl(pPrevReader); if (code != TSDB_CODE_SUCCESS) { - goto _err; + return code; } - initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader); - resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order); + code = doOpenReaderImpl(pNextReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - // no data in files, let's try buffer in memory - if (pPrevReader->status.fileIter.numOfFiles == 0) { - pPrevReader->status.loadFromFile = false; - } else { - code = initForFirstBlockInFile(pPrevReader, pBlockIter); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; } } @@ -3418,6 +3448,19 @@ void tsdbReaderClose(STsdbReader* pReader) { return; } + { + if (pReader->innerReader[0] != NULL) { + pReader->innerReader[0]->status.pTableMap = NULL; + pReader->innerReader[0]->pReadSnap = NULL; + + pReader->innerReader[1]->status.pTableMap = NULL; + pReader->innerReader[1]->pReadSnap = NULL; + + tsdbReaderClose(pReader->innerReader[0]); + tsdbReaderClose(pReader->innerReader[1]); + } + } + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; taosMemoryFreeClear(pSupInfo->plist); @@ -3508,32 +3551,32 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } - if (pReader->innerReader[0] != NULL) { + if (pReader->innerReader[0] != NULL && pReader->step == 0) { bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); + resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey); + pReader->step = EXTERNAL_ROWS_PREV; + if (ret) { - pReader->step = EXTERNAL_ROWS_PREV; return ret; } + } - tsdbReaderClose(pReader->innerReader[0]); - pReader->innerReader[0] = NULL; + if (pReader->step == EXTERNAL_ROWS_PREV) { + pReader->step = EXTERNAL_ROWS_MAIN; } - pReader->step = EXTERNAL_ROWS_MAIN; bool ret = doTsdbNextDataBlock(pReader); if (ret) { return ret; } - if (pReader->innerReader[1] != NULL) { + if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) { + resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey); bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); + pReader->step = EXTERNAL_ROWS_NEXT; if (ret1) { - pReader->step = EXTERNAL_ROWS_NEXT; return ret1; } - - tsdbReaderClose(pReader->innerReader[1]); - pReader->innerReader[1] = NULL; } return false; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a1ff79bd03738a1f0195879985a9da9544630c03..31a7b0facfea798b40d11b7b6670c59057d4cb05 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -81,11 +81,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } -static int32_t getExprFunctionId(SExprInfo* pExprInfo) { - assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE); - return 0; -} - static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static void releaseQueryBuf(size_t numOfTables); @@ -1115,7 +1110,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } } -static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); +static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) { if (pFilterNode == NULL || pBlock->info.rows == 0) { @@ -1126,18 +1121,17 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM // todo move to the initialization function int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock}; + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; code = filterSetDataFromSlotId(filter, ¶m1); - int8_t* rowRes = NULL; + SColumnInfoData* p = NULL; + int32_t status = 0; // todo the keep seems never to be True?? - bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); + bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status); filterFreeInfo(filter); - extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); + extractQualifiedTupleByFilterResult(pBlock, p, keep, status); if (pColMatchInfo != NULL) { for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { @@ -1152,16 +1146,22 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM } } - taosMemoryFree(rowRes); + colDataDestroy(p); + taosMemoryFree(p); } -void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) { +void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) { if (keep) { return; } - if (rowRes != NULL) { - int32_t totalRows = pBlock->info.rows; + int32_t totalRows = pBlock->info.rows; + + if (status == FILTER_RESULT_ALL_QUALIFIED) { + // here nothing needs to be done + } else if (status == FILTER_RESULT_NONE_QUALIFIED) { + pBlock->info.rows = 0; + } else { SSDataBlock* px = createOneDataBlock(pBlock, true); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -1177,7 +1177,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR int32_t numOfRows = 0; for (int32_t j = 0; j < totalRows; ++j) { - if (rowRes[j] == 0) { + if (((int8_t*)p->pData)[j] == 0) { continue; } @@ -1189,6 +1189,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR numOfRows += 1; } + // todo this value can be assigned directly if (pBlock->info.rows == totalRows) { pBlock->info.rows = numOfRows; } else { @@ -1197,13 +1198,10 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } blockDataDestroy(px); // fix memory leak - } else { - // do nothing - pBlock->info.rows = 0; } } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; @@ -4246,10 +4244,10 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.finalize) { - int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - if (TAOS_FAILED(code)) { - qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); + int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(code1)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); + T_LONG_JMP(pTaskInfo->env, code1); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 660420aebb150b88e6077c871e2e311f47dfdeb1..b5ad3f9270a87a6d812ad8ae9929f5fd89535756 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2075,7 +2075,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo } } - pSliceInfo->fillLastPoint = isLastRow ? true : false; + pSliceInfo->fillLastPoint = isLastRow; } static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) { @@ -2294,15 +2294,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { SSDataBlock* pResBlock = pSliceInfo->pRes; SExprSupp* pSup = &pOperator->exprSupp; - // if (pOperator->status == OP_RES_TO_RETURN) { - // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - // if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) { - // doSetOperatorCompleted(pOperator); - // } - // - // return pResBlock; - // } - int32_t order = TSDB_ORDER_ASC; SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -2432,6 +2423,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { break; } } + } else { + // store ts value as start, and calculate interp value when processing next block + doKeepLinearInfo(pSliceInfo, pBlock, i, true); } } else { // non-linear interpolation if (i < pBlock->info.rows - 1) { @@ -2510,6 +2504,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { break; } } + } else { // it is the last row of current block + // store ts value as start, and calculate interp value when processing next block + doKeepLinearInfo(pSliceInfo, pBlock, i, true); } } else { // non-linear interpolation pSliceInfo->current = @@ -2615,6 +2612,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->interval.interval = pInterpPhyNode->interval; pInfo->current = pInfo->win.skey; + STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; + pScanInfo->cond.twindows = pInfo->win; + pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; + pOperator->name = "TimeSliceOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; pOperator->blocking = false; diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 26735fa263cfed15ead940493b3c1eadf0e29c70..bc10ce71aeeb49ca816d4ea95c489f412ef13da6 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -26,11 +26,6 @@ typedef struct SFuncMgtService { SHashObj* pFuncNameHashTable; } SFuncMgtService; -typedef struct SUdfInfo { - SDataType outputDt; - int8_t funcType; -} SUdfInfo; - static SFuncMgtService gFunMgtService; static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT; static int32_t initFunctionCode = 0; diff --git a/source/libs/scalar/inc/filterInt.h b/source/libs/scalar/inc/filterInt.h index e7695b2f04ea4fed2ebf9a77bf00717e1978003e..5d243d86e872be722eeaf885ec5a0ff9d3f6108e 100644 --- a/source/libs/scalar/inc/filterInt.h +++ b/source/libs/scalar/inc/filterInt.h @@ -99,7 +99,7 @@ typedef struct SFilterRange { typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t); typedef int32_t(*filter_desc_compare_func)(const void *, const void *); -typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SColumnDataAgg *, int16_t); +typedef bool(*filter_exec_func)(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols); typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **); typedef struct SFilterRangeCompare { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 9e676354374fce6c2e733ac8d42c45baef9bada8..dc6b505bb957f09de765181d6da8e6b71211eb56 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -2976,14 +2976,12 @@ _return: return TSDB_CODE_SUCCESS; } -bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; uint32_t *unitIdx = NULL; - if (*p == NULL) { - *p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); - } + int8_t* p = (int8_t*)pRes->pData; for (int32_t i = 0; i < numOfRows; ++i) { //FILTER_UNIT_CLR_F(info); @@ -3002,35 +3000,35 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p, uint8_t optr = cunit->optr; if (colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) { - (*p)[i] = optr == OP_TYPE_IS_NULL ? true : false; + p[i] = (optr == OP_TYPE_IS_NULL) ? true : false; } else { if (optr == OP_TYPE_IS_NOT_NULL) { - (*p)[i] = 1; + p[i] = 1; } else if (optr == OP_TYPE_IS_NULL) { - (*p)[i] = 0; + p[i] = 0; } else if (cunit->rfunc >= 0) { - (*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]); + p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]); } else { - (*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData); + p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData); } //FILTER_UNIT_SET_R(info, uidx, p[i]); //FILTER_UNIT_SET_F(info, uidx); } - if ((*p)[i] == 0) { + if (p[i] == 0) { break; } } - if ((*p)[i]) { + if (p[i]) { break; } unitIdx += unitNum; } - if ((*p)[i] == 0) { + if (p[i] == 0) { all = false; } } @@ -3040,7 +3038,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p, -int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) { +int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) { if (statis && numOfRows >= FILTER_RM_UNIT_MIN_ROWS) { info->blkFlag = 0; @@ -3058,7 +3056,6 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t* assert(info->unitNum > 1); *all = filterExecuteBasedOnStatisImpl(info, numOfRows, p, statis, numOfCols); - goto _return; } } @@ -3067,59 +3064,55 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t* _return: info->blkFlag = 0; - return TSDB_CODE_SUCCESS; } - -static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { +static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) { return true; } -static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { + +static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) { return false; } -static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { + +static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; - if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { - return all; - } + int8_t* p = (int8_t*)pRes->pData; - if (*p == NULL) { - *p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); + if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) { + return all; } for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); - (*p)[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); + p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); - if ((*p)[i] == 0) { + if (p[i] == 0) { all = false; } } return all; } -static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { +static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; - if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { + if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) { return all; } - if (*p == NULL) { - *p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); - } + int8_t* p = (int8_t*)pRes->pData; for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); - (*p)[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); - if ((*p)[i] == 0) { + p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); + if (p[i] == 0) { all = false; } } @@ -3127,7 +3120,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows return all; } -bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; uint16_t dataSize = info->cunits[0].dataSize; @@ -3136,13 +3129,11 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD void *valData2 = info->cunits[0].valData2; __compar_fn_t func = gDataCompare[info->cunits[0].func]; - if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { + if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) { return all; } - if (*p == NULL) { - *p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); - } + int8_t* p = (int8_t*) pRes->pData; for (int32_t i = 0; i < numOfRows; ++i) { void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i); @@ -3152,9 +3143,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD continue; } - (*p)[i] = (*rfunc)(colData, colData, valData, valData2, func); + p[i] = (*rfunc)(colData, colData, valData, valData2, func); - if ((*p)[i] == 0) { + if (p[i] == 0) { all = false; } } @@ -3162,23 +3153,21 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD return all; } -bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; - if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { + if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) { return all; } - if (*p == NULL) { - *p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); - } + int8_t* p = (int8_t*) pRes->pData; for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) { - (*p)[i] = 0; + p[i] = 0; all = false; continue; } @@ -3191,14 +3180,14 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa qError("castConvert1 taosUcs4ToMbs error"); }else{ varDataSetLen(newColData, len); - (*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData); + p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData); } taosMemoryFreeClear(newColData); }else{ - (*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData); + p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData); } - if ((*p)[i] == 0) { + if (p[i] == 0) { all = false; } } @@ -3207,17 +3196,15 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa } -bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; - if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { + if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) { return all; } - if (*p == NULL) { - *p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); - } + int8_t* p = (int8_t*) pRes->pData; for (int32_t i = 0; i < numOfRows; ++i) { //FILTER_UNIT_CLR_F(info); @@ -3235,14 +3222,14 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg uint8_t optr = cunit->optr; if (colData == NULL || colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) { - (*p)[i] = optr == OP_TYPE_IS_NULL ? true : false; + p[i] = optr == OP_TYPE_IS_NULL ? true : false; } else { if (optr == OP_TYPE_IS_NOT_NULL) { - (*p)[i] = 1; + p[i] = 1; } else if (optr == OP_TYPE_IS_NULL) { - (*p)[i] = 0; + p[i] = 0; } else if (cunit->rfunc >= 0) { - (*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]); + p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]); } else { if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){ char *newColData = taosMemoryCalloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1); @@ -3251,11 +3238,11 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg qError("castConvert1 taosUcs4ToMbs error"); }else{ varDataSetLen(newColData, len); - (*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData); + p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData); } taosMemoryFreeClear(newColData); }else{ - (*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData); + p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData); } } @@ -3263,17 +3250,17 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg //FILTER_UNIT_SET_F(info, uidx); } - if ((*p)[i] == 0) { + if (p[i] == 0) { break; } } - if ((*p)[i]) { + if (p[i]) { break; } } - if ((*p)[i] == 0) { + if (p[i] == 0) { all = false; } } @@ -4026,37 +4013,62 @@ _return: FLT_RET(code); } -bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t *pResultStatus) { if (NULL == info) { + *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; return false; } - if (info->scalarMode) { - SScalarParam output = {0}; + SScalarParam output = {0}; + SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; - SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; - int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + if (info->scalarMode) { SArray *pList = taosArrayInit(1, POINTER_BYTES); taosArrayPush(pList, &pSrc); FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output)); - *p = taosMemoryMalloc(output.numOfRows * sizeof(bool)); - - memcpy(*p, output.columnData->pData, output.numOfRows); - colDataDestroy(output.columnData); - taosMemoryFree(output.columnData); + *p = output.columnData; taosArrayDestroy(pList); + + if (output.numOfQualified == output.numOfRows) { + *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; + } else if (output.numOfQualified == 0) { + *pResultStatus = FILTER_RESULT_NONE_QUALIFIED; + } else { + *pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED; + } return false; - } + } else { + *p = output.columnData; + output.numOfRows = pSrc->info.rows; - return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols); -} + bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols); + // todo this should be return during filter procedure + int32_t num = 0; + for(int32_t i = 0; i < output.numOfRows; ++i) { + if (((int8_t*)((*p)->pData))[i] == 1) { + ++num; + } + } + + if (num == output.numOfRows) { + *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; + } else if (num == 0) { + *pResultStatus = FILTER_RESULT_NONE_QUALIFIED; + } else { + *pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED; + } + + return keep; + } +} typedef struct SClassifyConditionCxt { bool hasPrimaryKey; diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 05730c62acb79129468c09bc1097727bde8b05fc..9cba94d85a1ffbfe2f2f1634589039e3a96b7585 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -606,6 +606,8 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o SCL_ERR_JRET(code); } + int32_t numOfQualified = 0; + bool value = false; bool complete = true; for (int32_t i = 0; i < rowNum; ++i) { @@ -631,6 +633,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o if (complete) { colDataAppend(output->columnData, i, (char*) &value, false); + if (value) { + numOfQualified++; + } } } @@ -639,8 +644,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o output->numOfRows = 0; } -_return: + output->numOfQualified = numOfQualified; +_return: sclFreeParamList(params, paramNum); SCL_RET(code); } @@ -1242,6 +1248,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); pDst->numOfRows = res->numOfRows; + pDst->numOfQualified = res->numOfQualified; } sclFreeParam(res); @@ -1249,7 +1256,6 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { } _return: - //nodesDestroyNode(pNode); sclFreeRes(ctx.pRes); return code; } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index a003315fcabeab38f49ae3a6056e25dff10e4e16..fe2a970aaa811cc02f589c845e3c62b7e70af8e8 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1475,19 +1475,19 @@ void vectorMathMinus(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO void vectorAssign(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { SColumnInfoData *pOutputCol = pOut->columnData; - pOut->numOfRows = pLeft->numOfRows; -// if (IS_HELPER_NULL(pRight->columnData, 0)) { if(colDataIsNull_s(pRight->columnData, 0)){ - for (int32_t i = 0; i < pOut->numOfRows; ++i) { - colDataAppend(pOutputCol, i, NULL, true); - } + colDataAppendNNULL(pOutputCol, 0, pOut->numOfRows); } else { + char* d = colDataGetData(pRight->columnData, 0); for (int32_t i = 0; i < pOut->numOfRows; ++i) { - colDataAppend(pOutputCol, i, colDataGetData(pRight->columnData, 0), false); + colDataAppend(pOutputCol, i, d, false); } } + + ASSERT(pRight->numOfQualified == 1 || pRight->numOfQualified == 0); + pOut->numOfQualified = pRight->numOfQualified * pOut->numOfRows; } void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { @@ -1646,38 +1646,60 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, doReleaseVec(pRightCol, rightConvert); } -#define VEC_COM_INNER(pCol, index1, index2) \ - for (; i < pCol->numOfRows && i >= 0; i += step) {\ - if (IS_HELPER_NULL(pLeft->columnData, index1) || IS_HELPER_NULL(pRight->columnData, index2)) {\ - bool res = false;\ - colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\ - continue;\ - }\ - char *pLeftData = colDataGetData(pLeft->columnData, index1);\ - char *pRightData = colDataGetData(pRight->columnData, index2);\ - int64_t leftOut = 0;\ - int64_t rightOut = 0;\ - bool freeLeft = false;\ - bool freeRight = false;\ - bool isJsonnull = false;\ - bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight),\ - &pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);\ - if(isJsonnull){\ - ASSERT(0);\ - }\ - if(!pLeftData || !pRightData){\ - result = false;\ - }\ - if(!result){\ - colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);\ - }else{\ - bool res = filterDoCompare(fp, optr, pLeftData, pRightData);\ - colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\ - }\ - if(freeLeft) taosMemoryFreeClear(pLeftData);\ - if(freeRight) taosMemoryFreeClear(pRightData);\ +int32_t doVectorCompareImpl(int32_t numOfRows, SScalarParam *pOut, int32_t startIndex, int32_t step, __compar_fn_t fp, + SScalarParam *pLeft, SScalarParam *pRight, int32_t optr) { + int32_t num = 0; + + for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) { + int32_t leftIndex = (i >= pLeft->numOfRows)? 0:i; + int32_t rightIndex = (i >= pRight->numOfRows)? 0:i; + + if (IS_HELPER_NULL(pLeft->columnData, leftIndex) || IS_HELPER_NULL(pRight->columnData, rightIndex)) { + bool res = false; + colDataAppendInt8(pOut->columnData, i, (int8_t *)&res); + continue; + } + + char * pLeftData = colDataGetData(pLeft->columnData, leftIndex); + char * pRightData = colDataGetData(pRight->columnData, rightIndex); + int64_t leftOut = 0; + int64_t rightOut = 0; + bool freeLeft = false; + bool freeRight = false; + bool isJsonnull = false; + + bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), &pLeftData, &pRightData, + &leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight); + if (isJsonnull) { + ASSERT(0); + } + + if (!pLeftData || !pRightData) { + result = false; + } + + if (!result) { + colDataAppendInt8(pOut->columnData, i, (int8_t *)&result); + } else { + bool res = filterDoCompare(fp, optr, pLeftData, pRightData); + colDataAppendInt8(pOut->columnData, i, (int8_t *)&res); + if (res) { + ++num; + } + } + + if (freeLeft) { + taosMemoryFreeClear(pLeftData); + } + + if (freeRight) { + taosMemoryFreeClear(pRightData); + } } + return num; +} + void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) { int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; @@ -1704,16 +1726,12 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam * char *pLeftData = colDataGetData(pLeft->columnData, i); bool res = filterDoCompare(fp, optr, pLeftData, pRight->pHashFilter); colDataAppendInt8(pOut->columnData, i, (int8_t*)&res); + if (res) { + pOut->numOfQualified++; + } } - return; - } - - if (pLeft->numOfRows == pRight->numOfRows) { - VEC_COM_INNER(pLeft, i, i) - } else if (pRight->numOfRows == 1) { - VEC_COM_INNER(pLeft, i, 0) - } else if (pLeft->numOfRows == 1) { - VEC_COM_INNER(pRight, 0, i) + } else { // normal compare + pOut->numOfQualified = doVectorCompareImpl(pOut->numOfRows, pOut, i, step, fp, pLeft, pRight, optr); } } diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index 5550519e05249de13d1267dd2a8f5bc1b10fae6d..7bf0191ec176c8a833eb3ff16fbb007e456fe93f 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -595,11 +595,11 @@ class TDTestCase: tdSql.checkData(2, i, 15) tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(linear)") - tdSql.checkRows(1) + tdSql.checkRows(3) tdSql.checkCols(4) for i in range (tdSql.queryCols): - tdSql.checkData(0, i, 15) + tdSql.checkData(0, i, 13) tdLog.printNoPrefix("==========step10:test error cases")