From cbe712b4ea7b3ca699903c0383228108de266005 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Apr 2021 18:32:01 +0800 Subject: [PATCH] [td-225] --- src/client/src/tscLocalMerge.c | 259 ++++++++++++++++++++++++++------- src/client/src/tscServer.c | 4 +- src/query/inc/qExecutor.h | 32 +++- src/query/inc/qExtbuffer.h | 2 + src/query/src/qExecutor.c | 115 ++++++++++++++- src/query/src/qExtbuffer.c | 2 +- src/query/src/qPlan.c | 2 +- 7 files changed, 347 insertions(+), 69 deletions(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index ad6f600bbd..4dc274fc5e 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1119,22 +1119,29 @@ static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, b } } -static void savePrevOrderColumns(SMultiwayMergeInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) { - int32_t size = pInfo->pMerge->pDesc->orderInfo.numOfCols; +//TODO it is not ordered, fix it +static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex, bool* hasPrev) { + int32_t size = (int32_t) taosArrayGetSize(pColumnList); + for(int32_t i = 0; i < size; ++i) { - int32_t index = pInfo->pMerge->pDesc->orderInfo.colIndex[i]; -// int32_t index = *(int16_t*)taosArrayGet(pInfo->orderColumnList, i); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index); + SColIndex* index = taosArrayGet(pColumnList, i); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index->colIndex); + assert(index->colId == pColInfo->info.colId); - memcpy(pInfo->prevRow[i], pColInfo->pData + pColInfo->info.bytes * rowIndex, pColInfo->info.bytes); + memcpy(prevRow[i], pColInfo->pData + pColInfo->info.bytes * rowIndex, pColInfo->info.bytes); } - pInfo->hasPrev = true; + (*hasPrev) = true; } static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) { SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; + char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES); + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + add[i] = pCtx[i].pInput; + } + for(int32_t i = 0; i < pBlock->info.rows; ++i) { if (pInfo->hasPrev) { if (needToMergeRv(pBlock, pInfo->pMerge, i, pInfo->prevRow)) { @@ -1160,6 +1167,23 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, pInfo->binfo.pRes->info.rows += 1; + if (i == 0) { + for(int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pOutput += pCtx[j].outputBytes; + aAggs[pCtx[j].functionId].init(&pCtx[j]); + } + + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + pCtx[j].size = 1; + aAggs[functionId].mergeFunc(&pCtx[j]); + } + } + for(int32_t j = 0; j < numOfExpr; ++j) { pCtx[j].pOutput += pCtx[j].outputBytes; pCtx[j].pInput += pCtx[j].inputBytes; @@ -1188,9 +1212,16 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, aAggs[functionId].mergeFunc(&pCtx[j]); } } + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); + } - savePrevOrderColumns(pInfo, pBlock, i); + { + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + pCtx[i].pInput = add[i]; + } } + + tfree(add); } static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { @@ -1804,16 +1835,14 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); char* p = pColInfo->pData + pBlock->info.rows * pColInfo->info.bytes; -// char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->num, col); char *src = COLMODEL_GET_VAL(buf, pModel, maxRows, rowIndex, i); -// char* src = buf + rowIndex * pColInfo->info.bytes; memmove(p, src, pColInfo->info.bytes); } pBlock->info.rows += 1; } -static SSDataBlock* doMultiwaySort(void* param) { +SSDataBlock* doMultiwaySort(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1841,6 +1870,50 @@ static SSDataBlock* doMultiwaySort(void* param) { // chosen from loser tree SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; + bool sameGroup = true; + if (pInfo->hasPrev) { + int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); + + // if this row belongs to current result set group + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); + SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex); + + char *newRow = + COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity, + pOneDataSrc->rowIdx, pIndex->colIndex); + + char * data = pInfo->prevRow[i]; + int32_t ret = columnValueAscendingComparator(data, newRow, pColInfo->info.type, pColInfo->info.bytes); + if (ret == 0) { + continue; + } else { + sameGroup = false; + break; + } + } + } + + if (!sameGroup || !pInfo->hasPrev) { //save the data + int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); + SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex); + + char *curCol = + COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity, + pOneDataSrc->rowIdx, pIndex->colIndex); + memcpy(pInfo->prevRow[i], curCol, pColInfo->info.bytes); + } + + pInfo->hasPrev = true; + } + + if (!sameGroup && pInfo->binfo.pRes->info.rows > 0) { + return pInfo->binfo.pRes; + } + appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pModel, pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity); #if defined(_DEBUG_VIEW) @@ -1854,7 +1927,7 @@ static SSDataBlock* doMultiwaySort(void* param) { pOneDataSrc->rowIdx += 1; adjustLoserTreeFromNewData(pMerger, pOneDataSrc, pTree); - if (pInfo->binfo.pRes->info.rows >= 4096) { // TODO threshold + if (pInfo->binfo.pRes->info.rows >= pInfo->bufCapacity) { return pInfo->binfo.pRes; } } @@ -1862,26 +1935,6 @@ static SSDataBlock* doMultiwaySort(void* param) { return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL; } -SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, - int32_t numOfRows, void *merger) { - SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); - - pInfo->pMerge = merger; - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); - - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "MultiwaySortOperator"; - pOperator->operatorType = OP_MultiwaySort; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->exec = doMultiwaySort; - - return pOperator; -} - SSDataBlock* doGlobalAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -1890,15 +1943,55 @@ SSDataBlock* doGlobalAggregate(void* param) { SMultiwayMergeInfo* pAggInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; +// SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SOperatorInfo *upstream = pOperator->upstream; + { + if (pAggInfo->hasDataBlockForNewGroup) { + // not belongs to the same group, return the result of current group; + setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows); + + doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pAggInfo->pExistBlock, false); + + savePrevOrderColumns(pAggInfo->groupPrevRow, pAggInfo->groupColumnList, pAggInfo->pExistBlock, 0, + &pAggInfo->hasPrev); + pAggInfo->pExistBlock = NULL; + pAggInfo->hasDataBlockForNewGroup = false; + } + } + while(1) { SSDataBlock* pBlock = upstream->exec(upstream); if (pBlock == NULL) { break; } + if (pAggInfo->hasPrev) { + bool sameGroup = true; + int32_t numOfCols = (int32_t) taosArrayGetSize(pAggInfo->groupColumnList); + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex *pIndex = taosArrayGet(pAggInfo->groupColumnList, i); + SColumnInfoData *pColInfo = taosArrayGet(pAggInfo->binfo.pRes->pDataBlock, pIndex->colIndex); + + char *data = pAggInfo->groupPrevRow[i]; + int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes); + if (ret == 0) { + continue; + } else { + sameGroup = false; + break; + } + } + + if (!sameGroup) { + pAggInfo->hasDataBlockForNewGroup = true; + pAggInfo->pExistBlock = pBlock; + savePrevOrderColumns(pAggInfo->prevRow, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasPrev); + break; + } + } + // not belongs to the same group, return the result of current group; setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows); @@ -1911,13 +2004,14 @@ SSDataBlock* doGlobalAggregate(void* param) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } + aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); } pAggInfo->binfo.pRes->info.rows += 1; - pOperator->status = OP_EXEC_DONE; - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); +// pOperator->status = OP_EXEC_DONE; +// setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); return pAggInfo->binfo.pRes; } @@ -1929,7 +2023,6 @@ SSDataBlock* doSLimit(void* param) { } SSLimitOperatorInfo *pInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SSDataBlock* pBlock = NULL; while (1) { @@ -1940,34 +2033,96 @@ SSDataBlock* doSLimit(void* param) { return NULL; } - if (pRuntimeEnv->currentOffset == 0) { - break; - } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { - pRuntimeEnv->currentOffset -= pBlock->info.rows; + if (pInfo->currentGroupOffset == 0) { + if (pInfo->currentOffset == 0) { // TODO refactor + break; + } else if (pInfo->currentOffset >= pBlock->info.rows) { + pInfo->currentOffset -= pBlock->info.rows; + } else { + int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); + pBlock->info.rows = remain; + + // move the remain rows of this data block to the front. + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColInfoData->info.bytes; + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); + } + + pInfo->currentOffset = 0; + break; + } } else { - int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); - pBlock->info.rows = remain; + if (pInfo->hasPrev) { + // Check if current data block belongs to current result group or not +// if (needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) { + bool sameGroup = true; + int32_t numOfCols = (int32_t) taosArrayGetSize(pInfo->orderColumnList); + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); + + char *data = pInfo->prevRow[i]; + int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes); + if (ret == 0) { + continue; + } else { + sameGroup = false; + break; + } + } - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (sameGroup) { + continue; // ignore the data block of the same group and try next + } else { + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); + pInfo->currentOffset = pInfo->limit.offset; // set the offset value for a new group + pInfo->rowsTotal = 0; + + if ((--pInfo->currentGroupOffset) == 0) { + if (pInfo->currentOffset == 0) { // TODO refactor + break; + } else if (pInfo->currentOffset >= pBlock->info.rows) { + pInfo->currentOffset -= pBlock->info.rows; + } else { + int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); + pBlock->info.rows = remain; + + // move the remain rows of this data block to the front. + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColInfoData->info.bytes; + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); + } - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); + pInfo->currentOffset = 0; + break; + } + } + } + } else { + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); } + } + } - pRuntimeEnv->currentOffset = 0; - break; + if (!pInfo->hasPrev || !needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) { + pInfo->groupTotal += 1; + if (pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort + return NULL; } } - if (pInfo->total + pBlock->info.rows >= pInfo->limit) { - pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total); - pInfo->total = pInfo->limit; + if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { + pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); + pInfo->rowsTotal = pInfo->limit.limit; setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; } else { - pInfo->total += pBlock->info.rows; + pInfo->rowsTotal += pBlock->info.rows; } return pBlock; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a3a8bc8784..ffa11d9552 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1594,8 +1594,8 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { } uint64_t localQueryId = 0; - SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info; - pInfo->pMerge = pRes->pLocalMerger; +// SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info; +// pInfo->pMerge = pRes->pLocalMerger; qTableQuery(pQueryInfo->pQInfo, &localQueryId); SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 61a74c5d0b..320c00e9d5 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -413,9 +413,22 @@ typedef struct SArithOperatorInfo { uint32_t seed; } SArithOperatorInfo; -typedef struct SSLimitOperatorInfo { +typedef struct SLimitOperatorInfo { int64_t limit; int64_t total; +} SLimitOperatorInfo; + +typedef struct SSLimitOperatorInfo { + int64_t groupTotal; + int64_t currentGroupOffset; + + int64_t rowsTotal; + int64_t currentOffset; + + SLimitVal limit; + SLimitVal slimit; + + struct SLocalMerger *pMerger; char **prevRow; bool hasPrev; SArray *orderColumnList; @@ -449,8 +462,15 @@ typedef struct SMultiwayMergeInfo { int32_t bufCapacity; int64_t seed; char **prevRow; - bool hasPrev; SArray *orderColumnList; + + char **groupPrevRow; + SArray *groupColumnList; + bool hasDataBlockForNewGroup; + SSDataBlock *pExistBlock; + + bool hasPrev; + bool groupMix; } SMultiwayMergeInfo; SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); @@ -469,12 +489,14 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, - int32_t numOfRows, void* merger); -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); + int32_t numOfRows, void* merger, bool groupMix); +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SSDataBlock* doGlobalAggregate(void* param); +SSDataBlock* doMultiwaySort(void* param); SSDataBlock* doSLimit(void* param); + SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index 98a7872ee2..8e64810c6c 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -240,6 +240,8 @@ int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1 struct SSDataBlock; int32_t compare_aRv(struct SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order); +int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes); + #ifdef __cplusplus } #endif diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e37549965d..09fcb303d2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1728,20 +1728,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_MultiwaySort: { + bool groupMix = true; + if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { + groupMix = false; + } pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, - 4096, merger); // TODO hack it + 4096, merger, groupMix); // TODO hack it break; } case OP_GlobalAggregate: { pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3); + pQueryAttr->numOfExpr3, merger); break; } case OP_SLimit: { - pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); + pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, + pQueryAttr->numOfExpr3, merger); break; } @@ -4321,22 +4326,56 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { taosArrayPush(pOrderColumns, &colIndex); } + { + numOfCols = (int32_t) taosArrayGetSize(pOrderColumns); + for(int32_t i = 0; i < numOfCols; ++i) { + SColIndex* index = taosArrayGet(pOrderColumns, i); + for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { + if (index->colId == pQuery->pExpr1[j].base.colInfo.colId) { + index->colIndex = j; + index->colId = pQuery->pExpr1[j].base.resColId; + } + } + } + } return pOrderColumns; } +SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) { + int32_t numOfCols = pQuery->pGroupbyExpr->numOfGroupCols; + + SArray* pOrderColumns = NULL; + if (numOfCols > 0) { + pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo); + } + + for(int32_t i = 0; i < numOfCols; ++i) { + SColIndex* index = taosArrayGet(pOrderColumns, i); + for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { + if (index->colId == pQuery->pExpr1[j].base.colInfo.colId) { + index->colIndex = j; + index->colId = pQuery->pExpr1[j].base.resColId; + } + } + } + + return pOrderColumns; +} SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, - SExprInfo* pExpr, int32_t numOfOutput) { + SExprInfo* pExpr, int32_t numOfOutput, void* param) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); // int32_t numOfRows = // (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); + pInfo->pMerge = param; pInfo->bufCapacity = 4096; pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); // TODO refactor int32_t len = 0; @@ -4344,7 +4383,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, len += pExpr[i].base.resBytes; } - int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0; + int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); int32_t offset = POINTER_BYTES * numOfOutput; @@ -4355,6 +4394,17 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, offset += pExpr[index->colIndex].base.resBytes; } + numOfCols = (pInfo->groupColumnList != NULL)? taosArrayGetSize(pInfo->groupColumnList):0; + pInfo->groupPrevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + offset = POINTER_BYTES * numOfOutput; + + for(int32_t i = 0; i < numOfCols; ++i) { + pInfo->groupPrevRow[i] = (char*)pInfo->groupPrevRow + offset; + + SColIndex* index = taosArrayGet(pInfo->groupColumnList, i); + offset += pExpr[index->colIndex].base.resBytes; + } + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->seed = rand(); @@ -4376,6 +4426,48 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, return pOperator; } +SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, + int32_t numOfRows, void *merger, bool groupMix) { + SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); + + pInfo->pMerge = merger; + pInfo->groupMix = groupMix; + pInfo->bufCapacity = numOfRows; + + pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + + { + int32_t len = 0; + for(int32_t i = 0; i < numOfOutput; ++i) { + len += pExpr[i].base.resBytes; + } + + int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0; + pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + int32_t offset = POINTER_BYTES * numOfOutput; + + for(int32_t i = 0; i < numOfCols; ++i) { + pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; + + SColIndex* index = taosArrayGet(pInfo->orderColumnList, i); + offset += pExpr[index->colIndex].base.resBytes; + } + } + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "MultiwaySortOperator"; + pOperator->operatorType = OP_MultiwaySort; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->exec = doMultiwaySort; + + return pOperator; +} + static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } @@ -5143,11 +5235,18 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); - pInfo->limit = pRuntimeEnv->pQueryAttr->slimit.limit; - pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + + pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); + pInfo->pMerger = pMerger; + pInfo->slimit = pQueryAttr->slimit; + pInfo->limit = pQueryAttr->limit; + + pInfo->currentGroupOffset = pQueryAttr->slimit.offset; + pInfo->currentOffset = pQueryAttr->limit.offset; // TODO refactor int32_t len = 0; diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index 8d4c24ee67..71aa844b86 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -364,7 +364,7 @@ static int32_t tsCompareFunc(TSKEY k1, TSKEY k2, int32_t order) { } } -static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) { +int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) { switch (type) { case TSDB_DATA_TYPE_INT: { int32_t first = *(int32_t *) f1; diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index a7c842e343..9d18bae5cd 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -152,7 +152,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { } // limit/offset operator - if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { + if (pQueryAttr->slimit.limit > 0 || pQueryAttr->slimit.offset > 0) { op = OP_SLimit; taosArrayPush(plan, &op); } -- GitLab