From 5746730a2dcb56b532b45e07dd2db420ffb15f5b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Feb 2021 23:14:22 +0800 Subject: [PATCH] [td-2895] refactor. --- src/query/inc/qExecutor.h | 13 +- src/query/src/qExecutor.c | 331 ++++++++++++++++++-------------------- src/query/src/queryMain.c | 25 +-- 3 files changed, 172 insertions(+), 197 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a9dc7f00f9..a30d5646e7 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -106,12 +106,11 @@ typedef struct SGroupResInfo { * If the number of generated results is greater than this value, * query query will be halt and return results to client immediate. */ -typedef struct SResultRec { +typedef struct SRspResultInfo { int64_t total; // total generated result size in rows - int64_t rows; // current result set size in rows int64_t capacity; // capacity of current result output buffer int32_t threshold; // result size threshold in rows. -} SResultRec; +} SRspResultInfo; typedef struct SResultRowInfo { SResultRow** pResult; // result list @@ -185,7 +184,6 @@ typedef struct SSDataBlock { SDataBlockInfo info; } SSDataBlock; - typedef struct SQuery { SLimitVal limit; @@ -227,9 +225,6 @@ typedef struct SQuery { SSingleColumnFilterInfo* pFilterInfo; uint32_t status; // query status - SResultRec rec; - int32_t pos; - tFilePage** sdata; STableQueryInfo* current; int32_t numOfCheckedBlocks; // number of check data blocks @@ -266,15 +261,15 @@ typedef struct SQueryRuntimeEnv { char* tagVal; // tag value of current data block SArithmeticSupport *sasArray; - struct SOperatorInfo* pi; SSDataBlock *outputBuf; - int32_t groupIndex; int32_t tableIndex; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo *proot; SGroupResInfo groupResInfo; int64_t currentOffset; // dynamic offset value + + SRspResultInfo resultInfo; } SQueryRuntimeEnv; enum { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index a9dc1f4ee9..8bebbbd24b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -209,9 +209,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator); void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); -// setup the output buffer -// TODO prepare the output buffer dynamically -static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) { +// setup the output buffer for each operator +static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); res->info.numOfCols = numOfOutput; @@ -221,7 +220,7 @@ static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) { idata.info.type = pExpr[i].type; idata.info.bytes = pExpr[i].bytes; idata.info.colId = pExpr[i].base.resColId; - idata.pData = calloc(4096, idata.info.bytes); + idata.pData = calloc(numOfRows, idata.info.bytes); taosArrayPush(res->pDataBlock, &idata); } @@ -862,9 +861,6 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo return num; } -static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock); - - static void doBlockwiseApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow *pWin, int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal, int32_t numOfOutput) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -995,6 +991,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) return ekey; } +#if 0 //todo binary search static UNUSED_FUNC void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock); @@ -1009,7 +1006,6 @@ static UNUSED_FUNC void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { return NULL; } - // todo refactor static UNUSED_FUNC char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { if (pDataBlock == NULL) { @@ -1057,6 +1053,7 @@ static UNUSED_FUNC char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, i return dataBlock; } +#endif static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) { if (type == RESULT_ROW_START_INTERP) { @@ -1086,13 +1083,14 @@ static UNUSED_FUNC void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDat } } -static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, const TSKEY* tsCols, int32_t step) { +static TSKEY getStartTsKey(SQuery* pQuery, STimeWindow* win, const TSKEY* tsCols, int32_t rows) { TSKEY ts = TSKEY_INITIAL_VAL; + bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); if (tsCols == NULL) { - ts = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.skey : pDataBlockInfo->window.ekey; + ts = ascQuery? win->skey : win->ekey; } else { - int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); + int32_t offset = ascQuery? 0:rows-1; ts = tsCols[offset]; } @@ -1216,10 +1214,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1); - int32_t startPos = pQuery->pos; - - TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step); + int32_t startPos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1); + TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery); bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN) ? true : false; @@ -1234,7 +1230,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t forwardStep = 0; TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = - getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true); + getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // prev time window not interpolation yet. int32_t curIndex = curTimeWindowIndex(pResultRowInfo); @@ -1313,9 +1309,7 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat int16_t type = pColInfoData->info.type; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { - int32_t offset = GET_COL_DATA_POS(pQuery, j, 1); - - char *val = pColInfoData->pData + bytes * offset; + char *val = pColInfoData->pData + bytes * j; if (isNull(val, type)) { // ignore the null value continue; } @@ -1330,7 +1324,7 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here int32_t functionId = pInfo->binfo.pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], offset); + aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j); } } } @@ -1886,10 +1880,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl // update the number of output result if (numOfRes > 0 && pQuery->checkResultBuf == 1) { - assert(numOfRes >= pQuery->rec.rows); - pQuery->rec.rows = numOfRes; + assert(numOfRes >= pRuntimeEnv->resultInfo.rows); + pRuntimeEnv->resultInfo.rows = numOfRes; - if (numOfRes >= pQuery->rec.threshold) { + if (numOfRes >= pRuntimeEnv->resultInfo.threshold) { setQueryStatus(pQuery, QUERY_RESBUF_FULL); } @@ -1954,7 +1948,7 @@ void UNUSED_FUNC setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inpu pCtx->preAggVals.dataBlockLoaded = (inputData != NULL); // limit/offset query will affect this value - pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1; +// pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1; // set the start position in current block // int32_t offset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1); @@ -2139,6 +2133,8 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { return NULL; } +static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); + static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, int16_t order, int32_t vgId) { qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; @@ -2148,6 +2144,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pQuery->interBufSize = getOutputInterResultBufSize(pQuery); + calResultBufSize(pQuery, &pRuntimeEnv->resultInfo); + pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t)); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); @@ -2177,11 +2175,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf // interval (down sampling operation) if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pQuery->stableQuery) { - pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); + pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); } else { - pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); + pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); @@ -2194,20 +2192,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } else if (pQuery->groupbyColumn) { - pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); + pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); } } else if (isFixedOutputQuery(pQuery)) { if (!pQuery->stableQuery) { - pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); + pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); } else { - pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); + pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); } - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); @@ -2215,8 +2213,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); if (!onlyQueryTags(pQuery)) { - pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->info, pRuntimeEnv->proot); } } @@ -2261,14 +2259,14 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { qDebug("QInfo:%p teardown runtime env", pQInfo); - if (isTsCompQuery(pQuery)) { - FILE *f = *(FILE **)pQuery->sdata[0]->data; - - if (f) { - fclose(f); - *(FILE **)pQuery->sdata[0]->data = NULL; - } - } +// if (isTsCompQuery(pQuery)) { +// FILE *f = *(FILE **)pQuery->sdata[0]->data; +// +// if (f) { +// fclose(f); +// *(FILE **)pQuery->sdata[0]->data = NULL; +// } +// } if (pRuntimeEnv->sasArray != NULL) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -3510,7 +3508,7 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput; } - memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pQuery->rec.capacity)); + memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pRuntimeEnv->resultInfo.capacity)); } initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); @@ -3633,16 +3631,16 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx) { void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (pQuery->rec.rows == 0 || pQuery->limit.offset == 0) { + if (pRuntimeEnv->resultInfo.rows == 0 || pQuery->limit.offset == 0) { return; } - if (pQuery->rec.rows <= pQuery->limit.offset) { - qDebug("QInfo:%p skip rows:%" PRId64 ", new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pQuery->rec.rows, - pQuery->limit.offset - pQuery->rec.rows); + if (pRuntimeEnv->resultInfo.rows <= pQuery->limit.offset) { + qDebug("QInfo:%p skip rows:%" PRId64 ", new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pRuntimeEnv->resultInfo.rows, + pQuery->limit.offset - pRuntimeEnv->resultInfo.rows); - pQuery->limit.offset -= pQuery->rec.rows; - pQuery->rec.rows = 0; + pQuery->limit.offset -= pRuntimeEnv->resultInfo.rows; + pRuntimeEnv->resultInfo.rows = 0; resetDefaultResInfoOutputBuf(pRuntimeEnv); @@ -3650,25 +3648,25 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL); } else { int64_t numOfSkip = pQuery->limit.offset; - pQuery->rec.rows -= numOfSkip; + pRuntimeEnv->resultInfo.rows -= numOfSkip; pQuery->limit.offset = 0; qDebug("QInfo:%p skip row:%"PRId64", new offset:%d, numOfRows remain:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), numOfSkip, - 0, pQuery->rec.rows); + 0, pRuntimeEnv->resultInfo.rows); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pExpr1[i].base.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pQuery->rec.rows * bytes)); - pRuntimeEnv->pCtx[i].pOutput = ((char*) pQuery->sdata[i]->data) + pQuery->rec.rows * bytes; + memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pRuntimeEnv->resultInfo.rows * bytes)); + pRuntimeEnv->pCtx[i].pOutput = ((char*) pQuery->sdata[i]->data) + pRuntimeEnv->resultInfo.rows * bytes; if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput; } } - updateNumOfResult(pRuntimeEnv, (int32_t)pQuery->rec.rows); + updateNumOfResult(pRuntimeEnv, (int32_t)pRuntimeEnv->resultInfo.rows); } } @@ -4428,7 +4426,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { // SQuery *pQuery = pRuntimeEnv->pQuery; // // int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); -// int32_t numOfResult = pQuery->rec.rows; // there are already exists result rows +// int32_t numOfResult = pRuntimeEnv->resultInfo.rows; // there are already exists result rows // // int32_t start = 0; // int32_t step = -1; @@ -4452,8 +4450,8 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { // int32_t numOfRowsToCopy = pRow->numOfRows; // // //current output space is not enough to accommodate all data of this page, prepare more space -// if (numOfRowsToCopy > (pQuery->rec.capacity - numOfResult)) { -// int32_t newSize = pQuery->rec.capacity + (numOfRowsToCopy - numOfResult); +// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { +// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); // expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv)); // } // @@ -4469,7 +4467,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { // } // // numOfResult += numOfRowsToCopy; -// if (numOfResult == pQuery->rec.capacity) { // output buffer is full +// if (numOfResult == pRuntimeEnv->resultInfo.capacity) { // output buffer is full // break; // } // } @@ -4516,8 +4514,8 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG int32_t numOfRowsToCopy = pRow->numOfRows; //current output space is not enough to accommodate all data of this page, prepare more space -// if (numOfRowsToCopy > (pQuery->rec.capacity - numOfResult)) { -// int32_t newSize = pQuery->rec.capacity + (numOfRowsToCopy - numOfResult); +// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { +// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); // expandBuffer(pRuntimeEnv, newSize, GET_QINFO_ADDR(pRuntimeEnv)); // } @@ -4538,7 +4536,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG } numOfResult += numOfRowsToCopy; - if (numOfResult == pQuery->rec.capacity) { // output buffer is full + if (numOfResult == pRuntimeEnv->resultInfo.capacity) { // output buffer is full break; } } @@ -4599,7 +4597,7 @@ static UNUSED_FUNC void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEn STableQueryInfo* pTableQueryInfo = pQuery->current; SResultRowInfo * pResultRowInfo = &pTableQueryInfo->resInfo; - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; +// pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; // if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pQuery->groupbyColumn) { // rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock); @@ -4620,7 +4618,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR return false; } - if (pQuery->limit.limit > 0 && pQuery->rec.total >= pQuery->limit.limit) { + if (pQuery->limit.limit > 0 && pRuntimeEnv->resultInfo.total >= pQuery->limit.limit) { return false; } @@ -4639,7 +4637,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR * set is the FIRST result block, the gap between the start time of query time window and the timestamp of the * first result row in the actual result set will fill nothing. */ - int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pQuery->rec.capacity); + int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity); return numOfTotal > 0; } else { // there are results waiting for returned to client. if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && @@ -4717,7 +4715,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; while (1) { - int32_t ret = (int32_t)taosFillResultDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pQuery->rec.capacity); + int32_t ret = (int32_t)taosFillResultDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pRuntimeEnv->resultInfo.capacity); // todo apply limit output function /* reached the start position of according to offset value, return immediately */ @@ -4748,7 +4746,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { } // no data in current data after fill - int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pQuery->rec.capacity); + int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pRuntimeEnv->resultInfo.capacity); if (numOfTotal == 0) { return 0; } @@ -4757,7 +4755,6 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { #endif int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutput) { - SQuery *pQuery = pRuntimeEnv->pQuery; SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES); @@ -4766,7 +4763,7 @@ int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutp p[i] = pColInfoData->pData; } - pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pQuery->rec.capacity); + pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pRuntimeEnv->resultInfo.capacity); tfree(p); return pOutput->info.rows; } @@ -5207,9 +5204,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts if (onlyQueryTags(pQuery)) { pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); } else if (needReverseScan(pQuery)) { - pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); + pRuntimeEnv->proot = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { - pRuntimeEnv->pi = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); + pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); } if (pTsBuf != NULL) { @@ -5252,7 +5249,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w); int32_t numOfCols = getNumOfFinalResCol(pQuery); - pRuntimeEnv->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, numOfCols, + pRuntimeEnv->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfCols, pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision, pQuery->fillType, pColInfo, pQInfo); } @@ -5561,7 +5558,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { int64_t numOfRes = getNumOfResult(pRuntimeEnv); if (numOfRes > 0) { - pQuery->rec.rows += numOfRes; + pRuntimeEnv->resultInfo.rows += numOfRes; forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } @@ -5571,7 +5568,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { // enable execution for next table, when handling the projection query enableExecutionForNextTable(pRuntimeEnv); - if (pQuery->rec.rows >= pQuery->rec.capacity) { + if (pRuntimeEnv->resultInfo.rows >= pRuntimeEnv->resultInfo.capacity) { setQueryStatus(pQuery, QUERY_RESBUF_FULL); break; } @@ -5636,14 +5633,14 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size, pRuntimeEnv->groupIndex); - pQuery->rec.rows = 0; - if (pWindowResInfo->size > pQuery->rec.capacity) { + pRuntimeEnv->resultInfo.rows = 0; + if (pWindowResInfo->size > pRuntimeEnv->resultInfo.capacity) { expandBuffer(pRuntimeEnv, pWindowResInfo->size, pQInfo); } initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); copyToOutputBuf(pRuntimeEnv, pWindowResInfo); - assert(pQuery->rec.rows == pWindowResInfo->size); + assert(pRuntimeEnv->resultInfo.rows == pWindowResInfo->size); resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); cleanupGroupResInfo(&pRuntimeEnv->groupResInfo); @@ -5709,7 +5706,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { } // it is a super table ordered projection query, check for the number of output for each vgroup - if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->rec.rows >= pQuery->prjInfo.vgroupLimit) { + if (pQuery->prjInfo.vgroupLimit > 0 && pRuntimeEnv->resultInfo.rows >= pQuery->prjInfo.vgroupLimit) { if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) { pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; @@ -5748,9 +5745,9 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey); - pQuery->rec.rows = getNumOfResult(pRuntimeEnv); + pRuntimeEnv->resultInfo.rows = getNumOfResult(pRuntimeEnv); - int64_t inc = pQuery->rec.rows - prev; + int64_t inc = pRuntimeEnv->resultInfo.rows - prev; pQuery->current->resInfo.size += (int32_t) inc; // the flag may be set by tableApplyFunctionsOnBlock, clear it here @@ -5759,7 +5756,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); if (pQuery->prjInfo.vgroupLimit >= 0) { - if (((pQuery->rec.rows + pQuery->rec.total) < pQuery->prjInfo.vgroupLimit) || ((pQuery->rec.rows + pQuery->rec.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) { + if (((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) < pQuery->prjInfo.vgroupLimit) || ((pRuntimeEnv->resultInfo.rows + pRuntimeEnv->resultInfo.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) { if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) { pQuery->prjInfo.ts = blockInfo.window.ekey; } else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) { @@ -5794,9 +5791,9 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { */ if (hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - pQuery->rec.total += pQuery->rec.rows; + pRuntimeEnv->resultInfo.total += pRuntimeEnv->resultInfo.rows; - if (pQuery->rec.rows > 0) { + if (pRuntimeEnv->resultInfo.rows > 0) { return; } } @@ -5865,7 +5862,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { } else { // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter - if (pQuery->rec.rows == 0) { + if (pRuntimeEnv->resultInfo.rows == 0) { assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); continue; } else { @@ -5901,8 +5898,8 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64, - pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pQuery->rec.rows, - pQuery->rec.total, pQuery->limit.offset); + pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pRuntimeEnv->resultInfo.rows, + pRuntimeEnv->resultInfo.total, pQuery->limit.offset); } } @@ -5985,7 +5982,7 @@ static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) { copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); } - qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total); + qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total); return; } @@ -6034,7 +6031,7 @@ static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) { } // handle the limitation of output buffer - qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); + qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total + pRuntimeEnv->resultInfo.rows); } static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { @@ -6153,7 +6150,6 @@ static SSDataBlock* doTableScan(void* param) { pTableScanInfo->order = cond.order; // todo refactor, extract function - SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; pResultRowInfo->curIndex = pResultRowInfo->size-1; pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey; @@ -6274,7 +6270,6 @@ static SSDataBlock* doAggregate(void* param) { int32_t order = pQuery->order.order; SOperatorInfo* upstream = pOperator->upstream; - pQuery->pos = 0; while(1) { SSDataBlock* pBlock = upstream->exec(upstream); @@ -6327,7 +6322,6 @@ static SSDataBlock* doSTableAggregate(void* param) { int32_t order = pQuery->order.order; SOperatorInfo* upstream = pOperator->upstream; - pQuery->pos = 0; while(1) { SSDataBlock* pBlock = upstream->exec(upstream); @@ -6371,7 +6365,6 @@ static SSDataBlock* doArithmeticOperation(void* param) { SArithOperatorInfo* pArithInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - pRuntimeEnv->pQuery->pos = 0; pArithInfo->binfo.pRes->info.rows = 0; while(1) { @@ -6494,7 +6487,6 @@ static SSDataBlock* doHashIntervalAgg(void* param) { STimeWindow win = pQuery->window; SOperatorInfo* upstream = pOperator->upstream; - pQuery->pos = 0; while(1) { SSDataBlock* pBlock = upstream->exec(upstream); @@ -6548,7 +6540,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { int32_t order = pQuery->order.order; SOperatorInfo* upstream = pOperator->upstream; - pQuery->pos = 0; while(1) { SSDataBlock* pBlock = upstream->exec(upstream); @@ -6599,7 +6590,6 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { } SOperatorInfo* upstream = pOperator->upstream; - pRuntimeEnv->pQuery->pos = 0; while(1) { SSDataBlock* pBlock = upstream->exec(upstream); @@ -6703,7 +6693,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6760,7 +6750,7 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6784,10 +6774,10 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); int64_t seed = rand(); - pInfo->bufCapacity = 4096; + pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; SOptrBasicInfo* pBInfo = &pInfo->binfo; - pBInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6848,7 +6838,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6872,7 +6862,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6896,7 +6886,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe pInfo->colIndex = -1; // group by column index pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6918,7 +6908,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6945,7 +6935,6 @@ static SSDataBlock* doTagScan(void* param) { STagScanInfo *pTagScanInfo = pOperator->info; SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); @@ -6978,7 +6967,7 @@ static SSDataBlock* doTagScan(void* param) { } } - while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) { + while(pRuntimeEnv->tableIndex < num && count < pRuntimeEnv->resultInfo.capacity) { int32_t i = pRuntimeEnv->tableIndex++; STableQueryInfo *item = taosArrayGetP(pa, i); @@ -7021,7 +7010,7 @@ static SSDataBlock* doTagScan(void* param) { qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count); } else*/ { // return only the tags|table name etc. count = 0; - int32_t maxNumOfTables = (int32_t)pQuery->rec.capacity; + int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; SExprInfo* pExprInfo = pOperator->pExpr; while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) { @@ -7063,7 +7052,7 @@ static SSDataBlock* doTagScan(void* param) { SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqTagScanOp"; @@ -7093,10 +7082,8 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0; } - void tableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -7108,29 +7095,29 @@ void tableQueryImpl(SQInfo *pQInfo) { * There are remain results that are not returned due to result interpolation * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ -// pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); - if (pQuery->rec.rows > 0) { +// pRuntimeEnv->resultInfo.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); + if (pRuntimeEnv->resultInfo.rows > 0) { limitOperator(pQuery, pQInfo); - qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total); + qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total); } else { return copyAndFillResult(pQInfo); } } else { - pQuery->rec.rows = 0; + pRuntimeEnv->resultInfo.rows = 0; assert(pRuntimeEnv->resultRowInfo.size > 0); copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); doSecondaryArithmeticProcess(pQuery); - if (pQuery->rec.rows > 0) { + if (pRuntimeEnv->resultInfo.rows > 0) { limitOperator(pQuery, pQInfo); } - if (pQuery->rec.rows > 0) { - qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pQuery->rec.rows, - pQuery->rec.total); + if (pRuntimeEnv->resultInfo.rows > 0) { + qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pRuntimeEnv->resultInfo.rows, + pRuntimeEnv->resultInfo.total); } else { - qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total); + qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pRuntimeEnv->resultInfo.total); } } @@ -7139,7 +7126,6 @@ void tableQueryImpl(SQInfo *pQInfo) { #endif // number of points returned during this query - pQuery->rec.rows = 0; int64_t st = taosGetTimestampUs(); assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); @@ -7149,7 +7135,6 @@ void tableQueryImpl(SQInfo *pQInfo) { pQuery->current = item; pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0; // record the total elapsed time pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); @@ -7159,7 +7144,7 @@ void tableQueryImpl(SQInfo *pQInfo) { void buildTableBlockDistResult(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - pQuery->pos = 0; +// pQuery->pos = 0; STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist)); pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SDataBlockInfo)); @@ -7199,19 +7184,18 @@ void buildTableBlockDistResult(SQInfo *pQInfo) { type = blockDistSchema.type; } assert(type == TSDB_DATA_TYPE_BINARY); - STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result)); +// STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result)); } freeTableBlockDist(pTableBlockDist); - pQuery->rec.rows = 1; +// pRuntimeEnv->resultInfo.rows = 1; setQueryStatus(pQuery, QUERY_COMPLETED); } void stableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; - pQuery->rec.rows = 0; +// pRuntimeEnv->resultInfo.rows = 0; int64_t st = taosGetTimestampUs(); @@ -7219,7 +7203,6 @@ void stableQueryImpl(SQInfo *pQInfo) { // (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) { //multiTableQueryProcess(pQInfo); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = pRuntimeEnv->outputBuf != NULL? pRuntimeEnv->outputBuf->info.rows:0; // } else { // assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn); // sequentialTableProcess(pQInfo); @@ -7971,7 +7954,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { } } -static void calResultBufSize(SQuery* pQuery) { +static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) { const int32_t RESULT_MSG_MIN_SIZE = 1024 * (1024 + 512); // bytes const int32_t RESULT_MSG_MIN_ROWS = 8192; const float RESULT_THRESHOLD_RATIO = 0.85f; @@ -7982,11 +7965,11 @@ static void calResultBufSize(SQuery* pQuery) { numOfRes = RESULT_MSG_MIN_ROWS; } - pQuery->rec.capacity = numOfRes; - pQuery->rec.threshold = (int32_t)(numOfRes * RESULT_THRESHOLD_RATIO); + pResultInfo->capacity = numOfRes; + pResultInfo->threshold = (int32_t)(numOfRes * RESULT_THRESHOLD_RATIO); } else { // in case of non-prj query, a smaller output buffer will be used. - pQuery->rec.capacity = 4096; - pQuery->rec.threshold = (int32_t)(pQuery->rec.capacity * RESULT_THRESHOLD_RATIO); + pResultInfo->capacity = 4096; + pResultInfo->threshold = (int32_t)(pResultInfo->capacity * RESULT_THRESHOLD_RATIO); } } @@ -8058,29 +8041,27 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr } // prepare the result buffer - pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES); - if (pQuery->sdata == NULL) { - goto _cleanup; - } - - calResultBufSize(pQuery); - - for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { - // allocate additional memory for interResults that are usually larger then final results - // TODO refactor - int16_t bytes = 0; - if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) { - bytes = pExprs[col].bytes; - } else { - bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes); - } +// pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES); +// if (pQuery->sdata == NULL) { +// goto _cleanup; +// } - size_t size = (size_t)((pQuery->rec.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage)); - pQuery->sdata[col] = (tFilePage *)calloc(1, size); - if (pQuery->sdata[col] == NULL) { - goto _cleanup; - } - } +// for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { +// // allocate additional memory for interResults that are usually larger then final results +// // TODO refactor +// int16_t bytes = 0; +// if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) { +// bytes = pExprs[col].bytes; +// } else { +// bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes); +// } +// +// size_t size = (size_t)((pRuntimeEnv->resultInfo.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage)); +// pQuery->sdata[col] = (tFilePage *)calloc(1, size); +// if (pQuery->sdata[col] == NULL) { +// goto _cleanup; +// } +// } if (pQuery->fillType != TSDB_FILL_NONE) { pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); @@ -8115,7 +8096,6 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr pthread_mutex_init(&pQInfo->lock, NULL); tsem_init(&pQInfo->ready, 0, 0); - pQuery->pos = -1; pQuery->window = pQueryMsg->window; changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); @@ -8326,13 +8306,6 @@ void freeQInfo(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (pQuery != NULL) { - if (pQuery->sdata != NULL) { - for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { - tfree(pQuery->sdata[col]); - } - tfree(pQuery->sdata); - } - if (pQuery->fillVal != NULL) { tfree(pQuery->fillVal); } @@ -8391,7 +8364,7 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { */ if (isTsCompQuery(pQuery) && (*numOfRows) > 0) { struct stat fStat; - FILE *f = *(FILE **)pQuery->sdata[0]->data; + FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data; if ((f != NULL) && (fstat(fileno(f), &fStat) == 0)) { *numOfRows = fStat.st_size; return fStat.st_size; @@ -8406,12 +8379,13 @@ size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // the remained number of retrieved rows, not the interpolated result + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; // load data from file to msg buffer if (isTsCompQuery(pQuery)) { - FILE *f = *(FILE **)pQuery->sdata[0]->data; // TODO refactor + FILE *f = NULL;//*(FILE **)pQuery->sdata[0]->data; // TODO refactor // make sure file exist if (f) { @@ -8437,7 +8411,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { } fclose(f); - *(FILE **)pQuery->sdata[0]->data = NULL; +// *(FILE **)pQuery->sdata[0]->data = NULL; } // all data returned, set query over @@ -8445,13 +8419,14 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { setQueryStatus(pQuery, QUERY_OVER); } } else { - doCopyQueryResultToMsg(pQInfo, (int32_t)pQuery->rec.rows, data); + doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); } - pQuery->rec.total += pQuery->rec.rows; - qDebug("QInfo:%p current numOfRes rows:%" PRId64 ", total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total); + pRuntimeEnv->resultInfo.total += pRuntimeEnv->outputBuf->info.rows; + qDebug("QInfo:%p current numOfRes rows:%d, total:%" PRId64, pQInfo, + pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total); - if (pQuery->limit.limit > 0 && pQuery->limit.limit == pQuery->rec.total) { + if (pQuery->limit.limit > 0 && pQuery->limit.limit == pRuntimeEnv->resultInfo.total) { qDebug("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit); setQueryStatus(pQuery, QUERY_OVER); } @@ -8494,7 +8469,6 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); @@ -8504,9 +8478,9 @@ void buildTagQueryResult(SQInfo* pQInfo) { } pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0; return; +#if 0 SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); size_t num = taosArrayGetSize(pa); @@ -8532,11 +8506,11 @@ void buildTagQueryResult(SQInfo* pQInfo) { } } - while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) { + while(pRuntimeEnv->tableIndex < num && count < pRuntimeEnv->resultInfo.capacity) { int32_t i = pRuntimeEnv->tableIndex++; STableQueryInfo *item = taosArrayGetP(pa, i); - char *output = pQuery->sdata[0]->data + count * rsize; + char *output = NULL;//pQuery->sdata[0]->data + count * rsize; varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); output = varDataVal(output); @@ -8568,7 +8542,7 @@ void buildTagQueryResult(SQInfo* pQInfo) { qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, count); } else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query - *(int64_t*) pQuery->sdata[0]->data = num; +// *(int64_t*) pQuery->sdata[0]->data = num; count = 1; SET_STABLE_QUERY_OVER(pRuntimeEnv); @@ -8577,8 +8551,8 @@ void buildTagQueryResult(SQInfo* pQInfo) { count = 0; SSchema* tbnameSchema = tGetTbnameColumnSchema(); - int32_t maxNumOfTables = (int32_t)pQuery->rec.capacity; - if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pQuery->rec.capacity) { + int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; + if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pRuntimeEnv->resultInfo.capacity) { maxNumOfTables = (int32_t)pQuery->limit.limit; } @@ -8607,13 +8581,13 @@ void buildTagQueryResult(SQInfo* pQInfo) { type = tbnameSchema->type; data = tsdbGetTableName(item->pTable); - dst = pQuery->sdata[j]->data + count * tbnameSchema->bytes; +// dst = pQuery->sdata[j]->data + count * tbnameSchema->bytes; } else { type = pExprInfo[j].type; bytes = pExprInfo[j].bytes; data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes); - dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes; +// dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes; } @@ -8625,8 +8599,9 @@ void buildTagQueryResult(SQInfo* pQInfo) { qDebug("QInfo:%p create tag values results completed, rows:%d", pQInfo, count); } - pQuery->rec.rows = count; + pRuntimeEnv->resultInfo.rows = count; setQueryStatus(pQuery, QUERY_COMPLETED); +#endif } static int64_t getQuerySupportBufSize(size_t numOfTables) { diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 38c2ff83d5..81221ed37a 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -244,14 +244,15 @@ bool qTableQuery(qinfo_t qinfo) { tableQueryImpl(pQInfo); } - SQuery* pQuery = pRuntimeEnv->pQuery; if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p query is killed", pQInfo); - } else if (pQuery->rec.rows == 0) { - qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); + } else if (pRuntimeEnv->outputBuf->info.rows == 0) { + qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, + pRuntimeEnv->resultInfo.total); } else { - qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", - pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); + qDebug("QInfo:%p query paused, %d rows returned, numOfTotal:%" PRId64 " rows", + pQInfo, pRuntimeEnv->outputBuf->info.rows, + pRuntimeEnv->resultInfo.total + pRuntimeEnv->outputBuf->info.rows); } return doBuildResCheck(pQInfo); @@ -279,6 +280,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex *buildRes = true; code = pQInfo->code; } else { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; pthread_mutex_lock(&pQInfo->lock); @@ -286,8 +288,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex assert(pQInfo->rspContext == NULL); if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; - qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->resultRowSize, - pQuery->rec.rows, tstrerror(pQInfo->code)); + qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize, + pRuntimeEnv->outputBuf->info.rows, tstrerror(pQInfo->code)); } else { *buildRes = false; qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); @@ -310,7 +312,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - size_t size = getResultSize(pQInfo, &pQuery->rec.rows); + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + int64_t s = pRuntimeEnv->outputBuf->info.rows; + + size_t size = getResultSize(pQInfo, &s); size += sizeof(int32_t); size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo); @@ -323,7 +328,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co return TSDB_CODE_QRY_OUT_OF_MEMORY; } - (*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows); + (*pRsp)->numOfRows = htonl((int32_t)s); if (pQInfo->code == TSDB_CODE_SUCCESS) { (*pRsp)->offset = htobe64(pQInfo->runtimeEnv.currentOffset); @@ -334,7 +339,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQuery->precision); - if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { + if (pQInfo->runtimeEnv.outputBuf->info.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { doDumpQueryResult(pQInfo, (*pRsp)->data); } else { setQueryStatus(pQuery, QUERY_OVER); -- GitLab