From 6863999ec5f5974dcd807d6eaa51e6ee48dd26a7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Apr 2022 17:59:08 +0800 Subject: [PATCH] [td-14493] support having in group by --- source/dnode/vnode/inc/tsdb.h | 4 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 +- source/libs/executor/inc/executorimpl.h | 12 +- source/libs/executor/src/executorimpl.c | 210 ++++++----------------- source/libs/executor/src/groupoperator.c | 197 +++++++++++++++++++-- source/libs/executor/src/scanoperator.c | 35 +--- source/libs/executor/src/tsimplehash.c | 6 +- source/libs/function/src/builtinsimpl.c | 2 +- source/util/src/thash.c | 16 +- 9 files changed, 262 insertions(+), 224 deletions(-) diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 3cabb6ce2e..38d6d879fa 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -172,9 +172,9 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); -int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo); +int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* pReader, STableBlockDistInfo* pTableBlockInfo); -bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle); +bool isTsdbCacheLastRow(tsdbReaderT* pReader); /** * diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4a3e764b13..144aa7fcdc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3046,8 +3046,8 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) { // return code; //} -bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle) { - return ((STsdbReadHandle *)pTsdbReadHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE; +bool isTsdbCacheLastRow(tsdbReaderT* pReader) { + return ((STsdbReadHandle *)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE; } int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3cbcbc004b..bf9377c83d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -551,6 +551,7 @@ typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; SArray* pGroupCols; SArray* pGroupColVals; // current group column values, SArray + SNode* pCondition; bool isInit; // denote if current val is initialized or not char* keyBuf; // group by keys for hash int32_t groupKeyLen; // total group by column width @@ -630,9 +631,10 @@ typedef struct SDistinctOperatorInfo { SHashObj* pSet; SSDataBlock* pRes; bool recordNullVal; // has already record the null value, no need to try again - int64_t threshold; // todo remove it - int64_t outputCapacity;// todo remove it - int32_t totalBytes; // todo remove it +// int64_t threshold; // todo remove it +// int64_t outputCapacity;// todo remove it +// int32_t totalBytes; // todo remove it + SResultInfo resInfo; char* buf; SArray* pDistinctDataInfo; } SDistinctOperatorInfo; @@ -651,6 +653,8 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); +void doSetOperatorCompleted(SOperatorInfo* pOperator); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, @@ -667,7 +671,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); + SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3e47356223..2323291fc1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -51,8 +51,6 @@ #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) -#define MULTI_KEY_DELIM "-" - enum { TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_NOT_EQUALS = 1, @@ -221,12 +219,13 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); -static void doSetOperatorCompleted(SOperatorInfo* pOperator) { +void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; if (pOperator->pTaskInfo != NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } } + #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) @@ -1245,11 +1244,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction } static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { - size_t num = 0; - if (pPseudoList != NULL) { - num = taosArrayGetSize(pPseudoList); - } - + size_t num = (pPseudoList != NULL)? taosArrayGetSize(pPseudoList):0; for (int32_t i = 0; i < num; ++i) { pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i); } @@ -1291,8 +1286,10 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k); pCtx[k].offset = pResult->info.rows; // set the start offset - int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); - pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; + if (taosArrayGetSize(pPseudoList) > 0) { + int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); + pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; + } int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); pResult->info.rows += numOfRows; @@ -3272,6 +3269,44 @@ void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, } } +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { + if (pFilterNode == NULL) { + return; + } + + SFilterInfo* filter = NULL; + int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); + + SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; + code = filterSetDataFromSlotId(filter, ¶m1); + + int8_t* rowRes = NULL; + bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); + + SSDataBlock* px = createOneDataBlock(pBlock); + blockDataEnsureCapacity(px, pBlock->info.rows); + + int32_t numOfRow = 0; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); + + numOfRow = 0; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + if (rowRes[j] == 0) { + continue; + } + + colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); + numOfRow += 1; + } + + *pSrc = *pDst; + } + + pBlock->info.rows = numOfRow; +} + void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) { // for simple group by query without interval, all the tables belong to one group result. int64_t uid = 0; @@ -6206,14 +6241,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pSortInfo); } -static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { - SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; - taosHashCleanup(pInfo->pSet); - taosMemoryFreeClear(pInfo->buf); - taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = blockDataDestroy(pInfo->pRes); -} - void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosArrayDestroy(pExInfo->pSources); @@ -6785,146 +6812,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo return pOperator; } -static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator, SSDataBlock* pBlock) { - if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) { - // distinct info already inited - return true; - } - for (int i = 0; i < pOperator->numOfOutput; i++) { - // pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; - } - for (int i = 0; i < pOperator->numOfOutput; i++) { - int numOfBlock = (int)(taosArrayGetSize(pBlock->pDataBlock)); - assert(i < numOfBlock); - for (int j = 0; j < numOfBlock; j++) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j); - if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) { - SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes}; - taosArrayInsert(pInfo->pDistinctDataInfo, i, &item); - } - } - } - pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput); - pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes); - return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false; -} - -static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) { - char* p = pInfo->buf; - memset(p, 0, pInfo->totalBytes); - - for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { - SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i); - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); - char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; - if (isNull(val, pDistDataInfo->type)) { - p += pDistDataInfo->bytes; - continue; - } - if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { - memcpy(p, varDataVal(val), varDataLen(val)); - p += varDataLen(val); - } else { - memcpy(p, val, pDistDataInfo->bytes); - p += pDistDataInfo->bytes; - } - memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); - p += strlen(MULTI_KEY_DELIM); - } -} - -static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SDistinctOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pRes = pInfo->pRes; - - pRes->info.rows = 0; - SSDataBlock* pBlock = NULL; - - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; - while (1) { - publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pDownstream->getNextFn(pDownstream, newgroup); - publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); - break; - } - if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) { - doSetOperatorCompleted(pOperator); - break; - } - - // ensure result output buf - if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { - int32_t newSize = pRes->info.rows + pBlock->info.rows; - for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i); - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i); - char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes); - if (tmp == NULL) { - return NULL; - } else { - pResultColInfoData->pData = tmp; - } - } - pInfo->outputCapacity = newSize; - } - - for (int32_t i = 0; i < pBlock->info.rows; i++) { - buildMultiDistinctKey(pInfo, pBlock, i); - if (taosHashGet(pInfo->pSet, pInfo->buf, pInfo->totalBytes) == NULL) { - int32_t dummy; - taosHashPut(pInfo->pSet, pInfo->buf, pInfo->totalBytes, &dummy, sizeof(dummy)); - for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist - - char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; - char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; - memcpy(start, val, pDistDataInfo->bytes); - } - pRes->info.rows += 1; - } - } - - if (pRes->info.rows >= pInfo->threshold) { - break; - } - } - return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; -} - -SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { - SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - - pOperator->resultInfo.capacity = 4096; // todo extract function. - - pInfo->totalBytes = 0; - pInfo->buf = NULL; - pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo)); - pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - - pOperator->name = "DistinctOperator"; - pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; -// pOperator->operatorType = DISTINCT; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; - pOperator->info = pInfo; - pOperator->getNextFn = hashDistinct; - pOperator->closeFn = destroyDistinctOperatorInfo; - - int32_t code = appendDownstream(pOperator, &downstream, 1); - return pOperator; -} static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { int32_t j = 0; @@ -7147,7 +7035,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { @@ -7157,8 +7044,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - queryId, taskId); + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); @@ -7215,7 +7101,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pAggNode->pGroupKeys != NULL) { SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); } else { return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } @@ -7306,7 +7192,7 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa continue; } - cond.colList[j].type = pColNode->node.resType.type; + cond.colList[j].type = pColNode->node.resType.type; cond.colList[j].bytes = pColNode->node.resType.bytes; cond.colList[j].colId = pColNode->colId; j += 1; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 90cb97f6c7..c22b58a0d8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -245,13 +245,14 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou } SGroupbyOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pRes = pInfo->binfo.pRes; + if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, - pInfo->binfo.rowCellInfoOffset); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); + if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pInfo->binfo.pRes; + return pRes; } int32_t order = TSDB_ORDER_ASC; @@ -283,18 +284,29 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou // pInfo->binfo.rowCellInfoOffset); // } - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); + blockDataEnsureCapacity(pRes, pInfo->binfo.capacity); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, - pInfo->binfo.rowCellInfoOffset); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; + + while(1) { + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, + pInfo->binfo.rowCellInfoOffset); + doFilter(pInfo->pCondition, pRes); + + bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); + if (!hasRemain) { + pOperator->status = OP_EXEC_DONE; + break; + } + + if (pRes->info.rows > 0) { + break; + } } return pInfo->binfo.pRes; } -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -303,6 +315,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx } pInfo->pGroupCols = pGroupColList; + pInfo->pCondition = pCondition; initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); @@ -329,4 +342,166 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; -} \ No newline at end of file +} + +#define MULTI_KEY_DELIM "-" + +static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { + SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; + taosHashCleanup(pInfo->pSet); + taosMemoryFreeClear(pInfo->buf); + taosArrayDestroy(pInfo->pDistinctDataInfo); + pInfo->pRes = blockDataDestroy(pInfo->pRes); +} + +static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) { + char* p = pInfo->buf; +// memset(p, 0, pInfo->totalBytes); + + for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { + SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i); + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); + + char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; + if (isNull(val, pDistDataInfo->type)) { + p += pDistDataInfo->bytes; + continue; + } + if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { + memcpy(p, varDataVal(val), varDataLen(val)); + p += varDataLen(val); + } else { + memcpy(p, val, pDistDataInfo->bytes); + p += pDistDataInfo->bytes; + } + memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); + p += strlen(MULTI_KEY_DELIM); + } +} + +static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) { + for (int i = 0; i < pOperator->numOfOutput; i++) { + // pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; + } +#if 0 + for (int i = 0; i < pOperator->numOfOutput; i++) { + int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock)); + assert(i < numOfCols); + + for (int j = 0; j < numOfCols; j++) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j); + if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) { + SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes}; + taosArrayInsert(pInfo->pDistinctDataInfo, i, &item); + } + } + } +#endif + +// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput); +// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes); + return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false; +} + +static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SDistinctOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pRes = pInfo->pRes; + + pRes->info.rows = 0; + SSDataBlock* pBlock = NULL; + + SOperatorInfo* pDownstream = pOperator->pDownstream[0]; + while (1) { + publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pDownstream->getNextFn(pDownstream, newgroup); + publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + + if (pBlock == NULL) { + doSetOperatorCompleted(pOperator); + break; + } + + // ensure result output buf + if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) { + int32_t newSize = pRes->info.rows + pBlock->info.rows; + for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i); + SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i); + +// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes); +// if (tmp == NULL) { +// return NULL; +// } else { +// pResultColInfoData->pData = tmp; +// } + } + pInfo->resInfo.capacity = newSize; + } + + for (int32_t i = 0; i < pBlock->info.rows; i++) { + buildMultiDistinctKey(pInfo, pBlock, i); + if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) { + taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0); + + for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { + SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist + + char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; + char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; + memcpy(start, val, pDistDataInfo->bytes); + } + + pRes->info.rows += 1; + } + } + + if (pRes->info.rows >= pInfo->resInfo.threshold) { + break; + } + } + + return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; +} + +SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { + SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pOperator->resultInfo.capacity = 4096; // todo extract function. + +// pInfo->totalBytes = 0; + pInfo->buf = NULL; + + pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo)); + initMultiDistinctInfo(pInfo, pOperator); + + pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + + pOperator->name = "DistinctOperator"; + pOperator->blockingOptr = true; + pOperator->status = OP_NOT_OPENED; +// pOperator->operatorType = DISTINCT; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->getNextFn = hashDistinct; + pOperator->closeFn = destroyDistinctOperatorInfo; + + int32_t code = appendDownstream(pOperator, &downstream, 1); + return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ace84c5f9a..f39a18da68 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -34,6 +34,7 @@ #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) + void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { SWITCH_ORDER(pCtx[i].order); @@ -91,39 +92,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); } - if (pTableScanInfo->pFilterNode != NULL) { - SFilterInfo* filter = NULL; - int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0); - - SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; - code = filterSetDataFromSlotId(filter, ¶m1); - - int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); - - SSDataBlock* px = createOneDataBlock(pBlock); - blockDataEnsureCapacity(px, pBlock->info.rows); - - int32_t numOfRow = 0; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); - - numOfRow = 0; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - if (rowRes[j] == 0) { - continue; - } - - colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); - numOfRow += 1; - } - *pSrc = *pDst; - } - - pBlock->info.rows = numOfRow; - } - + doFilter(pTableScanInfo->pFilterNode, pBlock); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 981da0415e..5c10216a96 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -37,7 +37,7 @@ typedef struct SHNode { char data[]; } SHNode; -typedef struct SSHashObj { +struct SSHashObj { SHNode **hashList; size_t capacity; // number of slots int64_t size; // number of elements in hash table @@ -45,7 +45,7 @@ typedef struct SSHashObj { _equal_fn_t equalFp; // equal function int32_t keyLen; int32_t dataLen; -} SSHashObj; +}; static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { int32_t len = MIN(length, HASH_MAX_CAPACITY); @@ -107,7 +107,7 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pDat return pNewNode; } -void taosHashTableResize(SSHashObj *pHashObj) { +static void taosHashTableResize(SSHashObj *pHashObj) { if (!HASH_NEED_RESIZE(pHashObj)) { return; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6f7fcd37be..fb6abe3c5d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -917,7 +917,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { colDataAppendInt32(pOutput, pos, &delta); } - if (tsList != NULL) { + if (pTsOutput != NULL) { colDataAppendInt64(pTsOutput, pos, &tsList[i]); } } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 88e5c12770..809d008aa7 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -311,10 +311,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo } uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); - SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); - if (pNewNode == NULL) { - return -1; - } // need the resize process, write lock applied if (HASH_NEED_RESIZE(pHashObj)) { @@ -355,6 +351,11 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table + SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); + if (pNewNode == NULL) { + return -1; + } + pushfrontNodeInEntryList(pe, pNewNode); assert(pe->next != NULL); @@ -368,9 +369,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo } else { // not support the update operation, return error if (pHashObj->enableUpdate) { + SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); + if (pNewNode == NULL) { + return -1; + } + doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); - } else { - FREE_HASH_NODE(pNewNode); } taosHashEntryWUnlock(pHashObj, pe); -- GitLab