diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index a01e426409964a75f38461aca7b536b01274b87c..ad6f600bbdffdfb3168ead549ee8eed4cc063f4e 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -327,7 +327,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pMerger->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx)); pMerger->rowSize = pMemBuffer[0]->nElemSize; - tscRestoreFuncForSTableQuery(pQueryInfo); +// tscRestoreFuncForSTableQuery(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo); if (pMerger->rowSize > pMemBuffer[0]->pageSize) { @@ -1162,7 +1162,9 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, for(int32_t j = 0; j < numOfExpr; ++j) { pCtx[j].pOutput += pCtx[j].outputBytes; - pCtx[j].pInput += pCtx[j].inputBytes; + pCtx[j].pInput += pCtx[j].inputBytes; + + aAggs[pCtx[j].functionId].init(&pCtx[j]); } for (int32_t j = 0; j < numOfExpr; ++j) { @@ -1174,7 +1176,6 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, pCtx[j].size = 1; aAggs[functionId].mergeFunc(&pCtx[j]); } - } } else { for (int32_t j = 0; j < numOfExpr; ++j) { @@ -1190,17 +1191,6 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, savePrevOrderColumns(pInfo, pBlock, i); } - - for(int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - aAggs[functionId].xFinalize(&pCtx[j]); - } - - pInfo->binfo.pRes->info.rows += 1; - } static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { @@ -1311,15 +1301,14 @@ bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index int32_t ret = 0; tOrderDescriptor *pDesc = pLocalMerge->pDesc; if (pDesc->orderInfo.numOfCols > 0) { - if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc - // todo refactor comparator +// if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc ret = compare_aRv(pBlock, pDesc->orderInfo.colIndex, pDesc->orderInfo.numOfCols, index, buf, TSDB_ORDER_ASC); - } else { // desc +// } else { // desc // ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); - } +// } } - /* if ret == 0, means the result belongs to the same group */ + // if ret == 0, means the result belongs to the same group return (ret == 0); } @@ -1912,12 +1901,75 @@ SSDataBlock* doGlobalAggregate(void* param) { // not belongs to the same group, return the result of current group; setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows); + doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pBlock, false); } + for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + int32_t functionId = pAggInfo->binfo.pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); + } + + pAggInfo->binfo.pRes->info.rows += 1; + pOperator->status = OP_EXEC_DONE; setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); return pAggInfo->binfo.pRes; } +SSDataBlock* doSLimit(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SSLimitOperatorInfo *pInfo = pOperator->info; + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + + SSDataBlock* pBlock = NULL; + while (1) { + pBlock = pOperator->upstream->exec(pOperator->upstream); + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + if (pRuntimeEnv->currentOffset == 0) { + break; + } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { + pRuntimeEnv->currentOffset -= pBlock->info.rows; + } else { + int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); + pBlock->info.rows = remain; + + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColInfoData->info.bytes; + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); + } + + pRuntimeEnv->currentOffset = 0; + break; + } + } + + if (pInfo->total + pBlock->info.rows >= pInfo->limit) { + pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total); + pInfo->total = pInfo->limit; + + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + } else { + pInfo->total += pBlock->info.rows; + } + + return pBlock; +} + diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index eda06612d94ed7f5465af0356e2a749d20330093..9e15d713505621d28e9257e7e2622125456b89b9 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3434,6 +3434,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfOutput = numOfOutput; pQueryAttr->limit = pQueryInfo->limit; + pQueryAttr->slimit = pQueryInfo->slimit; pQueryAttr->order = pQueryInfo->order; pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 124ddd06ade45229785aabddd6f155e51781ed90..61a74c5d0ba152079c82bfc93d06507369eb8937 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -181,6 +181,7 @@ typedef struct SSDataBlock { // execution of query in a data node. typedef struct SQueryAttr { SLimitVal limit; + SLimitVal slimit; bool stableQuery; // super table query or not bool topBotQuery; // TODO used bitwise flag @@ -412,10 +413,13 @@ typedef struct SArithOperatorInfo { uint32_t seed; } SArithOperatorInfo; -typedef struct SLimitOperatorInfo { - int64_t limit; - int64_t total; -} SLimitOperatorInfo; +typedef struct SSLimitOperatorInfo { + int64_t limit; + int64_t total; + char **prevRow; + bool hasPrev; + SArray *orderColumnList; +} SSLimitOperatorInfo; typedef struct SFillOperatorInfo { SFillInfo *pFillInfo; @@ -442,6 +446,7 @@ struct SLocalMerger; typedef struct SMultiwayMergeInfo { struct SLocalMerger *pMerge; SOptrBasicInfo binfo; + int32_t bufCapacity; int64_t seed; char **prevRow; bool hasPrev; @@ -465,12 +470,16 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows, void* merger); -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn, int32_t numOfOrder); +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); + SSDataBlock* doGlobalAggregate(void* param); +SSDataBlock* doSLimit(void* param); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 70c1e551d120ab50744e24f8dc975f802514a566..e37549965ddf6aba83e2d5b77357fb83d35bb73e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1728,7 +1728,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_MultiwaySort: { - pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, + pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); // TODO hack it break; } @@ -1736,7 +1736,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_GlobalAggregate: { pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1); + pQueryAttr->numOfExpr3); + break; + } + + case OP_SLimit: { + pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); break; } @@ -2925,12 +2930,11 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); } -void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { - SOptrBasicInfo* pBInfo = &pInfo->binfo; +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) { SSDataBlock* pDataBlock = pBInfo->pRes; int32_t newSize = pDataBlock->info.rows + numOfInputRows; - if (pInfo->bufCapacity < newSize) { + if ((*bufCapacity) < newSize) { for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes); @@ -2939,7 +2943,7 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { // it starts from the tail of the previously generated results. pBInfo->pCtx[i].pOutput = pColInfo->pData; - pInfo->bufCapacity = newSize; + (*bufCapacity) = newSize; } else { // longjmp } @@ -4300,56 +4304,75 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime return pOptr; } +SArray* getOrderCheckColumns(SQueryAttr* pQuery) { + int32_t numOfCols = pQuery->pGroupbyExpr->numOfGroupCols; + + SArray* pOrderColumns = NULL; + if (numOfCols > 0) { + pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo); + } + + if (pQuery->interval.interval > 0) { + if (pOrderColumns == NULL) { + pOrderColumns = taosArrayInit(1, sizeof(SColIndex)); + } + + SColIndex colIndex = {.colIndex = 0, .colId = 0, .flag = TSDB_COL_NORMAL}; + taosArrayPush(pOrderColumns, &colIndex); + } + + return pOrderColumns; +} + + SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, - SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn, - int32_t numOfOrder) { + SExprInfo* pExpr, int32_t numOfOutput) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); -// SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t numOfRows = 4096; // int32_t numOfRows = // (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + pInfo->bufCapacity = 4096; + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); + // TODO refactor int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { len += pExpr[i].base.resBytes; } - pInfo->prevRow = taosArrayInit(numOfOrder, (POINTER_BYTES * numOfOrder + len)); + int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0; + pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); int32_t offset = POINTER_BYTES * numOfOutput; - for(int32_t i = 0; i < numOfOrder; ++i) { + + for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; - int32_t index = orderColumn[i]; - if (index != INT32_MIN) { - offset += pExpr[index].base.resBytes; - } + SColIndex* index = taosArrayGet(pInfo->orderColumnList, i); + offset += pExpr[index->colIndex].base.resBytes; } - pInfo->orderColumnList = taosArrayFromList(orderColumn, numOfOrder, sizeof(int32_t)); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->seed = rand(); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "GlobalAggregate"; + pOperator->name = "GlobalAggregate"; pOperator->operatorType = OP_GlobalAggregate; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->upstream = upstream; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; - - pOperator->exec = doGlobalAggregate; - pOperator->cleanup = destroyBasicOperatorInfo; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->upstream = upstream; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; + + pOperator->exec = doGlobalAggregate; + pOperator->cleanup = destroyBasicOperatorInfo; return pOperator; } @@ -4492,7 +4515,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(pArithInfo, pBlock->info.rows); + updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows); arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); @@ -5120,9 +5143,28 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { - SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); - pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { + SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); + pInfo->limit = pRuntimeEnv->pQueryAttr->slimit.limit; + + pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); + + // TODO refactor + int32_t len = 0; + for(int32_t i = 0; i < numOfOutput; ++i) { + len += pExpr[i].base.resBytes; + } + + int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0; + pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + int32_t offset = POINTER_BYTES * numOfOutput; + + for(int32_t i = 0; i < numOfCols; ++i) { + pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; + + SColIndex* index = taosArrayGet(pInfo->orderColumnList, i); + offset += pExpr[index->colIndex].base.resBytes; + } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -5131,7 +5173,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; - pOperator->exec = doLimit; + pOperator->exec = doSLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv;