From 4c04ce09284eae6715f0b64625102e257404d7c9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Dec 2020 13:58:06 +0800 Subject: [PATCH] [TD-2434]: reduce the memory requirement during super table interval query. --- src/client/src/tscAsync.c | 2 +- src/client/src/tscFunctionImpl.c | 18 +- src/query/inc/qExecutor.h | 14 +- src/query/inc/qHistogram.h | 2 +- src/query/inc/qUtil.h | 1 - src/query/src/qExecutor.c | 937 +++++++++++-------------------- src/query/src/qHistogram.c | 9 +- src/query/src/qUtil.c | 23 +- 8 files changed, 351 insertions(+), 655 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 97b962e53a..ad7041db10 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -388,10 +388,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) { return; } + assert(pSql->res.code != TSDB_CODE_SUCCESS); tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); SSqlRes *pRes = &pSql->res; - if (pSql->fp == NULL || pSql->fetchFp == NULL){ return; } diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index f313d355f1..6afc5ba223 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2597,14 +2597,23 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) { } ////////////////////////////////////////////////////////////////////////////////// +static void buildHistogramInfo(SAPercentileInfo* pInfo) { + pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo)); + pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo)); +} + static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - + SAPercentileInfo* pInfo = NULL; + if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) { - return (SAPercentileInfo*) pCtx->aOutputBuf; + pInfo = (SAPercentileInfo*) pCtx->aOutputBuf; } else { - return GET_ROWCELL_INTERBUF(pResInfo); + pInfo = GET_ROWCELL_INTERBUF(pResInfo); } + + buildHistogramInfo(pInfo); + return pInfo; } static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { @@ -2616,6 +2625,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); + printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems); return true; } @@ -2624,6 +2634,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pInfo = getAPerctInfo(pCtx); + + assert(pInfo->pHisto->elems != NULL); for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_CHAR_INDEX(pCtx, i); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index d1278e2eee..201b3b2abc 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -33,13 +33,6 @@ struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); -typedef struct SGroupResInfo { - int32_t groupId; - int32_t numOfDataPages; - int32_t pageId; - int32_t rowId; -} SGroupResInfo; - typedef struct SResultRowPool { int32_t elemSize; int32_t blockSize; @@ -72,6 +65,12 @@ typedef struct SResultRow { union {STimeWindow win; char* key;}; // start key of current time window } SResultRow; +typedef struct SGroupResInfo { + int32_t rowId; + int32_t index; + SArray* pRows; // SArray +} SGroupResInfo; + /** * If the number of generated results is greater than this value, * query query will be halt and return results to client immediate. @@ -89,7 +88,6 @@ typedef struct SResultRowInfo { int32_t size:24; // number of result set int32_t capacity; // max capacity int32_t curIndex; // current start active index - int64_t startTime; // start time of the first time window for sliding query int64_t prevSKey; // previous (not completed) sliding window start key } SResultRowInfo; diff --git a/src/query/inc/qHistogram.h b/src/query/inc/qHistogram.h index 442e61750b..c904d0ec9b 100644 --- a/src/query/inc/qHistogram.h +++ b/src/query/inc/qHistogram.h @@ -67,7 +67,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto); void tHistogramPrint(SHistogramInfo* pHisto); -int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val); +//int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val); SHeapEntry* tHeapCreate(int32_t numOfEntries); void tHeapSort(SHeapEntry* pEntry, int32_t len); diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index dde2e39845..974d93f89c 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -77,7 +77,6 @@ void* destroyResultRowPool(SResultRowPool* p); int32_t getNumOfAllocatedResultRows(SResultRowPool* p); int32_t getNumOfUsedResultRows(SResultRowPool* p); -uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv); bool isPointInterpoQuery(SQuery *pQuery); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b0c7cd3a62..c22764697b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -178,11 +178,10 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { #define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables)) // todo move to utility -static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group); +static int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableList, SQInfo* pQInfo); static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); -static void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, @@ -195,7 +194,6 @@ static bool hasMainOutput(SQuery *pQuery); static void buildTagQueryResult(SQInfo *pQInfo); static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo); -static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo); static int32_t checkForQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); @@ -291,7 +289,7 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { } } -static int32_t getMergeResultGroupId(int32_t groupIndex) { +static UNUSED_FUNC int32_t getMergeResultGroupId(int32_t groupIndex) { int32_t base = 50000000; return base + (groupIndex * 10000); } @@ -466,16 +464,34 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { + bool existed = false; SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); - int32_t *p1 = - (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - if (p1 != NULL) { - pResultRowInfo->curIndex = *p1; + + SResultRow **p1 = + (SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + + // in case of repeat scan/reverse scan, no new time window added. + if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { + if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists. + return (p1 != NULL)? *p1:NULL; + } + + if (p1 != NULL) { + for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) { + if (pResultRowInfo->pResult[i] == (*p1)) { + pResultRowInfo->curIndex = i; + existed = true; + break; + } + } + } } else { - if (!masterscan) { // not master scan, do not add new timewindow - return NULL; + if (p1 != NULL) { // group by column query + return *p1; } + } + if (!existed) { // TODO refactor // more than the capacity, reallocate the resources if (pResultRowInfo->size >= pResultRowInfo->capacity) { @@ -499,17 +515,23 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes pResultRowInfo->capacity = (int32_t)newCapacity; } - SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool); - pResultRowInfo->pResult[pResultRowInfo->size] = pResult; - int32_t ret = initResultRow(pResult); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + SResultRow *pResult = NULL; + + if (p1 == NULL) { + pResult = getNewResultRow(pRuntimeEnv->pool); + int32_t ret = initResultRow(pResult); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + // add a new result set for a new group + taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES); + } else { + pResult = *p1; } - // add a new result set for a new group + pResultRowInfo->pResult[pResultRowInfo->size] = pResult; pResultRowInfo->curIndex = pResultRowInfo->size++; - taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), - (char *)&pResultRowInfo->curIndex, sizeof(int32_t)); } // too many time window in query @@ -591,7 +613,6 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf if (pData->num >= numOfRowsPerPage) { // release current page first, and prepare the next one releaseResBufPageInfo(pResultBuf, pi); - pData = getNewDataBuf(pResultBuf, tid, &pageId); if (pData != NULL) { assert(pData->num == 0); // number of elements must be 0 for new allocated buffer @@ -614,24 +635,20 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf return 0; } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SDataBlockInfo* pBockInfo, - STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { +static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win, + bool masterscan, SResultRow** pResult, int64_t groupId) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - // todo refactor - int64_t uid = getResultInfoUId(pRuntimeEnv); - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId); if (pResultRow == NULL) { - *newWind = false; - return masterscan? -1:0; // no master scan, no result generated means error occurs + *pResult = NULL; + return TSDB_CODE_SUCCESS; } - *newWind = true; - // not assign result buffer yet, add new result buffer if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, pBockInfo->tid, pRuntimeEnv->numOfRowsPerPage); + int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupId, pRuntimeEnv->numOfRowsPerPage); if (ret != TSDB_CODE_SUCCESS) { return -1; } @@ -701,81 +718,47 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se return forwardStep; } -static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY lastKey, bool ascQuery) { - int32_t i = 0; +static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, bool ascQuery) { int64_t skey = TSKEY_INITIAL_VAL; - - int32_t numOfClosed = 0; - for (i = 0; i < pWindowResInfo->size; ++i) { - SResultRow *pResult = pWindowResInfo->pResult[i]; + int32_t i = 0; + for (i = pResultRowInfo->size - 1; i >= 0; --i) { + SResultRow *pResult = pResultRowInfo->pResult[i]; if (pResult->closed) { - numOfClosed += 1; - continue; + break; } - TSKEY ekey = pResult->win.ekey; - if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { - closeResultRow(pWindowResInfo, i); + // new closed result rows + if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { + closeResultRow(pResultRowInfo, i); } else { skey = pResult->win.skey; - break; } } - // all windows are closed, set the last one to be the skey + // all result rows are closed, set the last one to be the skey if (skey == TSKEY_INITIAL_VAL) { - assert(i == pWindowResInfo->size); - pWindowResInfo->curIndex = pWindowResInfo->size - 1; + pResultRowInfo->curIndex = pResultRowInfo->size - 1; } else { - pWindowResInfo->curIndex = i; - pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; - } - - return numOfClosed; -} -/** - * NOTE: the query status only set for the first scan of master scan. - */ -static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) { - SQuery *pQuery = pRuntimeEnv->pQuery; - if (pRuntimeEnv->scanFlag != MASTER_SCAN || pWindowResInfo->size == 0) { - return pWindowResInfo->size; - } - - // no qualified results exist, abort check - int32_t numOfClosed = 0; - bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); - - // query completed - if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) { - closeAllResultRows(pWindowResInfo); - - pWindowResInfo->curIndex = pWindowResInfo->size - 1; - setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); - } else { // set the current index to be the last unclosed window - numOfClosed = updateResultRowCurrentIndex(pWindowResInfo, lastKey, ascQuery); - - // the number of completed slots are larger than the threshold, return current generated results to client. - if (numOfClosed > pQuery->rec.threshold) { - qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return", - GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold); - - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } else { - qDebug("QInfo:%p total result window:%d already closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, - numOfClosed); + for (i = pResultRowInfo->size - 1; i >= 0; --i) { + SResultRow *pResult = pResultRowInfo->pResult[i]; + if (pResult->closed) { + break; + } } - } - // output has reached the limitation, set query completed - if (pQuery->limit.limit > 0 && (pQuery->limit.limit + pQuery->limit.offset) <= numOfClosed && - pRuntimeEnv->scanFlag == MASTER_SCAN) { - setQueryStatus(pQuery, QUERY_COMPLETED); + pResultRowInfo->curIndex = i + 1; // current not closed result object + pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey; } +} - assert(pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL); - return numOfClosed; +static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) { + if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { + closeAllResultRows(pResultRowInfo); + pResultRowInfo->curIndex = pResultRowInfo->size - 1; + } else { + doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey, ascQuery); + } } static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, @@ -818,52 +801,47 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo return num; } -static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset, - int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) { - SQuery * pQuery = pRuntimeEnv->pQuery; +static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) { + SQuery *pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; bool hasPrev = pCtx[0].preAggVals.isSet; - if (IS_MASTER_SCAN(pRuntimeEnv) || closed) { - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - pCtx[k].nStartQueryTimestamp = pWin->skey; - pCtx[k].size = forwardStep; - pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); - - int32_t functionId = pQuery->pExpr1[k].base.functionId; - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - pCtx[k].ptsList = &tsCol[pCtx[k].startOffset]; - } + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + pCtx[k].nStartQueryTimestamp = pWin->skey; + pCtx[k].size = forwardStep; + pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); - // not a whole block involved in query processing, statistics data can not be used - // NOTE: the original value of isSet have been changed here - if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) { - pCtx[k].preAggVals.isSet = false; - } + int32_t functionId = pQuery->pExpr1[k].base.functionId; + if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { + pCtx[k].ptsList = &tsCol[pCtx[k].startOffset]; + } - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); - } + // not a whole block involved in query processing, statistics data can not be used + // NOTE: the original value of isSet have been changed here + if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) { + pCtx[k].preAggVals.isSet = false; + } - // restore it - pCtx[k].preAggVals.isSet = hasPrev; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunction(&pCtx[k]); } + + // restore it + pCtx[k].preAggVals.isSet = hasPrev; } } -static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset) { - SQuery * pQuery = pRuntimeEnv->pQuery; +static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset) { + SQuery *pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - if (IS_MASTER_SCAN(pRuntimeEnv) || closed) { - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - pCtx[k].nStartQueryTimestamp = pWin->skey; + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + pCtx[k].nStartQueryTimestamp = pWin->skey; - int32_t functionId = pQuery->pExpr1[k].base.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); - } + int32_t functionId = pQuery->pExpr1[k].base.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); } } } @@ -1178,7 +1156,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); - SQuery *pQuery = pRuntimeEnv->pQuery; + SQuery *pQuery = pRuntimeEnv->pQuery; + int64_t groupId = pQuery->current->groupIndex; + TSKEY *tsCols = NULL; if (pDataBlock != NULL) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0); @@ -1203,56 +1183,46 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - bool hasTimeWindow = false; SResultRow* pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); - if (ret != TSDB_CODE_SUCCESS) { - tfree(sasArray); - return; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + goto _end; } int32_t forwardStep = 0; int32_t startPos = pQuery->pos; - // in case of repeat scan/reverse scan, no new time window added. - if (hasTimeWindow) { - TSKEY ekey = reviseWindowEkey(pQuery, &win); - forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); - - // prev time window not interpolation yet. - int32_t curIndex = curTimeWindowIndex(pWindowResInfo); - if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) { - for(int32_t j = prevIndex; j < curIndex; ++j) { - SResultRow *pRes = pWindowResInfo->pResult[j]; + TSKEY ekey = reviseWindowEkey(pQuery, &win); + forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); - STimeWindow w = pRes->win; - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &w, masterScan, &hasTimeWindow, &pResult); - assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + // prev time window not interpolation yet. + int32_t curIndex = curTimeWindowIndex(pWindowResInfo); + if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) { + for(int32_t j = prevIndex; j < curIndex; ++j) { + SResultRow *pRes = pWindowResInfo->pResult[j]; - int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1; - doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP); - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); + STimeWindow w = pRes->win; + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId); + assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); - doBlockwiseApplyFunctions(pRuntimeEnv, closed, &w, startPos, 0, tsCols, pDataBlockInfo->rows); - } + int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1; + doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP); + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); - assert (ret == TSDB_CODE_SUCCESS); // null data, too many state code + doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows); } - // window start key interpolation - doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep); - - bool pStatus = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); - doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows); + // restore current time window + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); + assert (ret == TSDB_CODE_SUCCESS); } - int32_t index = pWindowResInfo->curIndex; - STimeWindow nextWin = win; + // window start key interpolation + doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep); + doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows); + STimeWindow nextWin = win; while (1) { int32_t prevEndPos = (forwardStep - 1) * step + startPos; startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos); @@ -1261,27 +1231,19 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } // null data, failed to allocate more memory buffer - hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != - TSDB_CODE_SUCCESS) { + int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { break; } - if (!hasTimeWindow) { - continue; - } - - TSKEY ekey = reviseWindowEkey(pQuery, &nextWin); + ekey = reviseWindowEkey(pQuery, &nextWin); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); // window start(end) key interpolation doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep); - - bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); - doBlockwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); + doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } - pWindowResInfo->curIndex = index; } else { /* * the sqlfunctionCtx parameters should be set done before all functions are invoked, @@ -1297,6 +1259,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } } + _end: if (pRuntimeEnv->timeWindowInterpo) { saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock); } @@ -1317,8 +1280,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat return -1; } - int32_t GROUPRESULTID = 1; - SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; // not assign result buffer yet, add new result buffer, TODO remove it @@ -1334,11 +1295,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - uint64_t uid = groupIndex; - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); - if (pResultRow == NULL) { - return -1; - } + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, groupIndex); + assert (pResultRow != NULL); int64_t v = -1; GET_TYPED_DATA(v, int64_t, type, pData); @@ -1355,7 +1313,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage); + int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage); if (ret != 0) { return -1; } @@ -1490,7 +1448,7 @@ void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDa double v1 = 0, v2 = 0, v = 0; if (prevRowIndex == -1) { - GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[k]); + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[index]); } else { GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); } @@ -1557,6 +1515,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; + int64_t groupId = item->groupIndex; + SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0); TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; @@ -1627,15 +1587,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - bool hasTimeWindow = false; SResultRow* pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - continue; - } - - if (!hasTimeWindow) { - continue; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // null data, too many state code + goto _end; } // window start key interpolation @@ -1646,18 +1601,15 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS for (int32_t k = prevWindowIndex; k < curIndex; ++k) { SResultRow *pRes = pWindowResInfo->pResult[k]; - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &pRes->win, masterScan, &hasTimeWindow, &pResult); + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); setTimeWindowEKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &pRes->win); - - bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); - doRowwiseApplyFunctions(pRuntimeEnv, closed, &pRes->win, offset); + doRowwiseApplyFunctions(pRuntimeEnv, &pRes->win, offset); } // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, - &pResult); + ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1666,8 +1618,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &win); } - bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); - doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset); + doRowwiseApplyFunctions(pRuntimeEnv, &win, offset); STimeWindow nextWin = win; int32_t index = pWindowResInfo->curIndex; @@ -1684,16 +1635,13 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } // null data, failed to allocate more memory buffer - hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) { + int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { break; } - if (hasTimeWindow) { - setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin); - closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); - doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset); - } + setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin); + doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset); } pWindowResInfo->curIndex = index; @@ -1728,6 +1676,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } + _end: assert(offset >= 0); if (tsCols != NULL) { item->lastKey = tsCols[offset] + step; @@ -1755,28 +1704,26 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; - STableQueryInfo* pTableQInfo = pQuery->current; - SResultRowInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; + STableQueryInfo* pTableQueryInfo = pQuery->current; + SResultRowInfo* pResultRowInfo = &pRuntimeEnv->windowResInfo; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) { - rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); + rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock); } else { - blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); + blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); } // update the lastkey of current table TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; - pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // interval query with limit applied int32_t numOfRes = 0; - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); - } else if (pRuntimeEnv->groupbyNormalCol) { - closeAllResultRows(pWindowResInfo); - numOfRes = pWindowResInfo->size; + if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { + numOfRes = pResultRowInfo->size; + updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery)); } else { // projection query - numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); + numOfRes = (int32_t) getNumOfResult(pRuntimeEnv); // update the number of output result if (numOfRes > 0 && pQuery->checkBuffer == 1) { @@ -1791,8 +1738,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl setQueryStatus(pQuery, QUERY_COMPLETED); } - if (((pTableQInfo->lastKey > pTableQInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) || - ((pTableQInfo->lastKey < pTableQInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) { + if (((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) || + ((pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) { setQueryStatus(pQuery, QUERY_COMPLETED); } } @@ -2610,6 +2557,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW *status = BLK_DATA_NO_NEEDED; SQuery *pQuery = pRuntimeEnv->pQuery; + int64_t groupId = pQuery->current->groupIndex; + SQueryCostInfo* pCost = &pRuntimeEnv->summary; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) { @@ -2626,15 +2575,13 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - bool hasTimeWindow = false; SResultRow* pResult = NULL; bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow, &pResult) != - TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) { // todo handle error in set result for timewindow } } @@ -2832,13 +2779,9 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo if (QUERY_IS_ASC_QUERY(pQuery)) { getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w); - pWindowResInfo->startTime = w.skey; pWindowResInfo->prevSKey = w.skey; - } else { - // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp + } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, pQuery->window.ekey, pBlockInfo->window.ekey, &w); - - pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->prevSKey = w.skey; } } @@ -2908,13 +2851,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { setQueryStatus(pQuery, QUERY_COMPLETED); } - if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) { - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - closeAllResultRows(&pRuntimeEnv->windowResInfo); - pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window - } else { - assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); - } + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + closeAllResultRows(&pRuntimeEnv->windowResInfo); + pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window } return 0; @@ -3019,7 +2958,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { } } -static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) { +static UNUSED_FUNC void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -3168,19 +3107,18 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim typedef struct SCompSupporter { STableQueryInfo **pTableQueryInfo; - int32_t * position; - SQInfo * pQInfo; + int32_t *rowIndex; + int32_t order; } SCompSupporter; int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { - int32_t left = *(int32_t *)pLeft; + int32_t left = *(int32_t *)pLeft; int32_t right = *(int32_t *)pRight; SCompSupporter * supporter = (SCompSupporter *)param; - SQueryRuntimeEnv *pRuntimeEnv = &supporter->pQInfo->runtimeEnv; - int32_t leftPos = supporter->position[left]; - int32_t rightPos = supporter->position[right]; + int32_t leftPos = supporter->rowIndex[left]; + int32_t rightPos = supporter->rowIndex[right]; /* left source is exhausted */ if (leftPos == -1) { @@ -3192,53 +3130,55 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) return -1; } - SResultRowInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; - SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); - tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId); + STableQueryInfo** pList = supporter->pTableQueryInfo; - char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1); - TSKEY leftTimestamp = GET_INT64_VAL(b1); + SResultRowInfo *pWindowResInfo1 = &(pList[left]->windowResInfo); + SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos); + TSKEY leftTimestamp = pWindowRes1->win.skey; - SResultRowInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; + SResultRowInfo *pWindowResInfo2 = &(pList[right]->windowResInfo); SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos); - tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId); - - char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2); - TSKEY rightTimestamp = GET_INT64_VAL(b2); + TSKEY rightTimestamp = pWindowRes2->win.skey; if (leftTimestamp == rightTimestamp) { return 0; } - return leftTimestamp > rightTimestamp ? 1 : -1; + if (supporter->order == TSDB_ORDER_ASC) { + return (leftTimestamp > rightTimestamp)? 1:-1; + } else { + return (leftTimestamp < rightTimestamp)? 1:-1; + } } -int32_t mergeIntoGroupResult(SQInfo *pQInfo) { +int32_t mergeGroupResult(SQInfo *pQInfo) { int64_t st = taosGetTimestampUs(); - int32_t ret = TSDB_CODE_SUCCESS; - int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); + SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; + int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); while (pQInfo->groupIndex < numOfGroups) { SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); - ret = mergeIntoGroupResultImpl(pQInfo, group); - if (ret < 0) { // not enough disk space to save the data into disk + + int32_t ret = mergeIntoGroupResultImpl(pGroupResInfo, group, pQInfo); + if (ret < 0) { return -1; } - pQInfo->groupIndex += 1; - // this group generates at least one result, return results - if (ret > 0) { + pQInfo->groupIndex += 1; + if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { break; } - assert(pQInfo->groupResInfo.numOfDataPages == 0); qDebug("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1); + taosArrayClear(pGroupResInfo->pRows); + + pGroupResInfo->index = 0; + pGroupResInfo->rowId = 0; } - SGroupResInfo* info = &pQInfo->groupResInfo; - if (pQInfo->groupIndex == numOfGroups && info->pageId == info->numOfDataPages) { + if (pQInfo->groupIndex == numOfGroups && taosArrayGetSize(pGroupResInfo->pRows) == 0) { SET_STABLE_QUERY_OVER(pQInfo); } @@ -3250,89 +3190,28 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { return TSDB_CODE_SUCCESS; } +static int32_t doCopyToSData(SQInfo *pQInfo, SResultRow **pRows, int32_t numOfRows, int32_t* index, int32_t orderType); + void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; - // all results have been return to client, try next group - if (pGroupResInfo->pageId == pGroupResInfo->numOfDataPages) { - pGroupResInfo->numOfDataPages = 0; - pGroupResInfo->pageId = 0; - pGroupResInfo->rowId = 0; - + // all results in current group have been returned to client, try next group + if (pGroupResInfo->index >= taosArrayGetSize(pGroupResInfo->pRows)) { // current results of group has been sent to client, try next group - if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) { + if (mergeGroupResult(pQInfo) != TSDB_CODE_SUCCESS) { return; // failed to save data in the disk } // check if all results has been sent to client int32_t numOfGroup = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); - if (pGroupResInfo->numOfDataPages == 0 && pQInfo->groupIndex == numOfGroup) { + if (taosArrayGetSize(pGroupResInfo->pRows) == 0 && pQInfo->groupIndex == numOfGroup) { SET_STABLE_QUERY_OVER(pQInfo); return; } } - SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; - SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - - int32_t id = pQInfo->groupResInfo.groupId; - SIDList list = getDataBufPagesIdList(pResultBuf, id); - - int32_t offset = 0; - int32_t numOfCopiedRows = 0; - - size_t size = taosArrayGetSize(list); - assert(size == pGroupResInfo->numOfDataPages); - - bool done = false; - - //TODO add API for release none-dirty pages -// SPageInfo* prev = NULL; - - for (int32_t j = pGroupResInfo->pageId; j < size; ++j) { - SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j); - tFilePage* pData = getResBufPage(pResultBuf, pi->pageId); - - // release previous buffer pages -// if (prev == NULL) { -// prev = pi; -// } else { -// if (prev->pageId != pi->pageId) { -// releaseResBufPageInfo(pResultBuf, prev); -// prev = pi; -// } -// } - - assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num); - int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId); - - if (numOfRes > pQuery->rec.capacity - offset) { - numOfCopiedRows = (int32_t)(pQuery->rec.capacity - offset); - pGroupResInfo->rowId += numOfCopiedRows; - done = true; - } else { - numOfCopiedRows = (int32_t)pData->num; - - pGroupResInfo->pageId += 1; - pGroupResInfo->rowId = 0; - } - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - char * pDest = pQuery->sdata[i]->data; - - memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage, - (size_t)bytes * numOfCopiedRows); - } - - offset += numOfCopiedRows; - if (done) { - break; - } - } - - assert(pQuery->rec.rows == 0); - pQuery->rec.rows += offset; + int32_t size = (int32_t) taosArrayGetSize(pGroupResInfo->pRows); + pQuery->rec.rows = doCopyToSData(pQInfo, pGroupResInfo->pRows->pData, (int32_t) size, &pGroupResInfo->index, TSDB_ORDER_ASC); } int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) { @@ -3360,155 +3239,99 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResu return 0; } -int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { +int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableList, SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; + bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery); + + int32_t code = TSDB_CODE_SUCCESS; - size_t size = taosArrayGetSize(pGroup); - tFilePage **buffer = pQuery->sdata; + int32_t *posList = NULL; + SLoserTreeInfo *pTree = NULL; + STableQueryInfo **pTableQueryInfoList = NULL; - int32_t *posList = calloc(size, sizeof(int32_t)); - STableQueryInfo **pTableList = malloc(POINTER_BYTES * size); + size_t size = taosArrayGetSize(pTableList); + if (pGroupResInfo->pRows == NULL) { + pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); + } - if (pTableList == NULL || posList == NULL) { - tfree(posList); - tfree(pTableList); + posList = calloc(size, sizeof(int32_t)); + pTableQueryInfoList = malloc(POINTER_BYTES * size); + if (pTableQueryInfoList == NULL || posList == NULL) { qError("QInfo:%p failed alloc memory", pQInfo); - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _end; } - // todo opt for the case of one table per group int32_t numOfTables = 0; - SIDList pageList = NULL; - int32_t tid = -1; - for (int32_t i = 0; i < size; ++i) { - STableQueryInfo *item = taosArrayGetP(pGroup, i); - - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid); - if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) { - pTableList[numOfTables++] = item; - tid = TSDB_TABLEID(item->pTable)->tid; - pageList = list; + STableQueryInfo *item = taosArrayGetP(pTableList, i); + if (item->windowResInfo.size > 0) { + pTableQueryInfoList[numOfTables++] = item; } } // there is no data in current group + // no need to merge results since only one table in each group if (numOfTables == 0) { - tfree(posList); - tfree(pTableList); - return 0; - } else if (numOfTables == 1) { // no need to merge results since only one table in each group - tfree(posList); - tfree(pTableList); - - SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; - - pGroupResInfo->numOfDataPages = (int32_t)taosArrayGetSize(pageList); - pGroupResInfo->groupId = tid; - pGroupResInfo->pageId = 0; - pGroupResInfo->rowId = 0; - - return pGroupResInfo->numOfDataPages; + goto _end; } - SCompSupporter cs = {pTableList, posList, pQInfo}; - - SLoserTreeInfo *pTree = NULL; - tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); + SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQuery->order.order}; - SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool); - resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); - - pQInfo->groupResInfo.groupId = getMergeResultGroupId(pQInfo->groupIndex); + int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); + if (ret != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _end; + } - // todo add windowRes iterator - int64_t lastTimestamp = -1; + int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX; int64_t startt = taosGetTimestampMs(); while (1) { if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); - - tfree(pTableList); - tfree(posList); - tfree(pTree); - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); + code = TSDB_CODE_TSC_QUERY_CANCELLED; + goto _end; } - int32_t pos = pTree->pNode[0].index; - - SResultRowInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; - SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]); - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); + int32_t tableIndex = pTree->pNode[0].index; - char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page); - TSKEY ts = GET_INT64_VAL(b); + SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->windowResInfo; + SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]); - assert(ts == pWindowRes->win.skey); int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes); if (num <= 0) { - cs.position[pos] += 1; + cs.rowIndex[tableIndex] += 1; - if (cs.position[pos] >= pWindowResInfo->size) { - cs.position[pos] = -1; - - // all input sources are exhausted - if (--numOfTables == 0) { + if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) { + cs.rowIndex[tableIndex] = -1; + if (--numOfTables == 0) { // all input sources are exhausted break; } } } else { - if (ts == lastTimestamp) { // merge with the last one - doMerge(pRuntimeEnv, ts, pWindowRes, true); - } else { // copy data to disk buffer - if (buffer[0]->num == pQuery->rec.capacity) { - if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) { - return -1; - } - - resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow); - } + assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery)); - doMerge(pRuntimeEnv, ts, pWindowRes, false); - buffer[0]->num += 1; + if (pWindowRes->win.skey != lastTimestamp) { + taosArrayPush(pGroupResInfo->pRows, &pWindowRes); + pWindowRes->numOfRows = num; } - lastTimestamp = ts; + lastTimestamp = pWindowRes->win.skey; - // move to the next element of current entry - int32_t currentPageId = pWindowRes->pageId; - - cs.position[pos] += 1; - if (cs.position[pos] >= pWindowResInfo->size) { - cs.position[pos] = -1; + // move to the next row of current entry + if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) { + cs.rowIndex[tableIndex] = -1; // all input sources are exhausted - if (--numOfTables == 0) { + if ((--numOfTables) == 0) { break; } - } else { - // current page is not needed anymore - SResultRow *pNextWindowRes = getResultRow(pWindowResInfo, cs.position[pos]); - if (pNextWindowRes->pageId != currentPageId) { - releaseResBufPage(pRuntimeEnv->pResultBuf, page); - } } } - tLoserTreeAdjust(pTree, pos + pTree->numOfEntries); - } - - if (buffer[0]->num != 0) { // there are data in buffer - if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) { - qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo); - - tfree(pTree); - tfree(pTableList); - tfree(posList); - return -1; - } + tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries); } int64_t endt = taosGetTimestampMs(); @@ -3519,65 +3342,16 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pQInfo, pQInfo->groupIndex, endt - startt); - tfree(pTableList); + _end: + tfree(pTableQueryInfoList); tfree(posList); tfree(pTree); -// tfree(pResultInfo); -// tfree(buf); - - return pQInfo->groupResInfo.numOfDataPages; -} - -int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - - // the base value for group result, since the maximum number of table for each vnode will not exceed 100,000. - int32_t pageId = -1; - int32_t capacity = pResultBuf->numOfRowsPerPage; - - int32_t remain = (int32_t) pQuery->sdata[0]->num; - int32_t offset = 0; - - while (remain > 0) { - int32_t rows = (remain > capacity)? capacity:remain; - assert(rows > 0); - - // get the output buffer page - tFilePage *buf = getNewDataBuf(pResultBuf, pGroupResInfo->groupId, &pageId); - buf->num = rows; - - // pagewisely copy to dest buffer - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - - char* output = buf->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage; - char* src = ((char *) pQuery->sdata[i]->data) + offset * bytes; - memcpy(output, src, (size_t)(buf->num * bytes)); - } - - offset += rows; - remain -= rows; - - pGroupResInfo->numOfDataPages += 1; + if (code != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, code); } - return TSDB_CODE_SUCCESS; -} - -void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow) { - SQuery* pQuery = pRuntimeEnv->pQuery; - - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes; - pCtx[k].size = 1; - pCtx[k].startOffset = 0; - pCtx[k].resultInfo = getResultCell(pRuntimeEnv, pRow, k); - - pQuery->sdata[k]->num = 0; - } + return code; } static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) { @@ -3698,7 +3472,7 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t tid = 0; - int64_t uid = getResultInfoUId(pRuntimeEnv); + int64_t uid = 0; SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&tid, sizeof(tid), true, uid); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -3830,12 +3604,8 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SResultRow *pResult = getResultRow(pWindowResInfo, i); - if (!pResult->closed) { - continue; - } setResultOutputBuf(pRuntimeEnv, pResult); - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int16_t functId = pQuery->pExpr1[j].base.functionId; if (functId == TSDB_FUNC_TS) { @@ -4119,9 +3889,9 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { * @param pDataBlockInfo */ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; // lastKey needs to be updated pTableQueryInfo->lastKey = nextKey; @@ -4134,12 +3904,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { return; } - uint64_t uid = getResultInfoUId(pRuntimeEnv); + int64_t uid = 0; SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid); - if (pResultRow == NULL) { - return; - } + assert (pResultRow != NULL); /* * not assign result buffer yet, add new result buffer @@ -4285,7 +4053,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { /** * In handling the both ascending and descending order super table query, we need to find the first qualified * timestamp of this table, and then set the first qualified start timestamp. - * In ascending query, key is the first qualified timestamp. However, in the descending order query, additional + * In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional * operations involve. */ STimeWindow w = TSWINDOW_INITIALIZER; @@ -4294,7 +4062,6 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { TSKEY sk = MIN(win.skey, win.ekey); TSKEY ek = MAX(win.skey, win.ekey); getAlignQueryTimeWindow(pQuery, win.skey, sk, ek, &w); - pWindowResInfo->startTime = pTableQueryInfo->win.skey; // windowSKey may be 0 in case of 1970 timestamp if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { if (!QUERY_IS_ASC_QUERY(pQuery)) { @@ -4333,36 +4100,33 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { return loadPrimaryTS; } -static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_t orderType) { +static int32_t doCopyToSData(SQInfo *pQInfo, SResultRow **pRows, int32_t numOfRows, int32_t *index, int32_t orderType) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; int32_t numOfResult = 0; - int32_t startIdx = 0; + int32_t start = 0; int32_t step = -1; qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); - int32_t totalSet = numOfClosedResultRows(pResultInfo); - SResultRow** result = pResultInfo->pResult; - if (orderType == TSDB_ORDER_ASC) { - startIdx = pQInfo->groupIndex; + start = (*index); step = 1; } else { // desc order copy all data - startIdx = totalSet - pQInfo->groupIndex - 1; + start = numOfRows - (*index) - 1; step = -1; } SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; - for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) { - if (result[i]->numOfRows == 0) { - pQInfo->groupIndex += 1; + for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { + if (pRows[i]->numOfRows == 0) { + (*index) += 1; pGroupResInfo->rowId = 0; continue; } - int32_t numOfRowsToCopy = result[i]->numOfRows - pGroupResInfo->rowId; + int32_t numOfRowsToCopy = pRows[i]->numOfRows - pGroupResInfo->rowId; int32_t oldOffset = pGroupResInfo->rowId; /* @@ -4374,16 +4138,16 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_ pGroupResInfo->rowId += numOfRowsToCopy; } else { pGroupResInfo->rowId = 0; - pQInfo->groupIndex += 1; + (*index) += 1; } - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i]->pageId); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRows[i]->pageId); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t size = pRuntimeEnv->pCtx[j].outputBytes; char *out = pQuery->sdata[j]->data + numOfResult * size; - char *in = getPosInResultPage(pRuntimeEnv, j, result[i], page); + char *in = getPosInResultPage(pRuntimeEnv, j, pRows[i], page); memcpy(out, in + oldOffset * size, size * numOfRowsToCopy); } @@ -4414,10 +4178,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC; - int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo, orderType); + int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo->pResult, pResultInfo->size, &pQInfo->groupIndex, orderType); pQuery->rec.rows += numOfResult; - assert(pQuery->rec.rows <= pQuery->rec.capacity); } @@ -4449,25 +4212,17 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc SQuery * pQuery = pRuntimeEnv->pQuery; STableQueryInfo* pTableQueryInfo = pQuery->current; - SResultRowInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; + SResultRowInfo * pResultRowInfo = &pTableQueryInfo->windowResInfo; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) { - rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); + rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock); } else { - blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); + blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); } if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); - - // TODO refactor - if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { - closeAllResultRows(pWindowResInfo); - pWindowResInfo->curIndex = pWindowResInfo->size - 1; - } else { - updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery); - } + updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery)); } } @@ -4801,13 +4556,10 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { if (QUERY_IS_ASC_QUERY(pQuery)) { if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &w); - pWindowResInfo->startTime = w.skey; pWindowResInfo->prevSKey = w.skey; } } else { getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &w); - - pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->prevSKey = w.skey; } @@ -5758,6 +5510,7 @@ static void doRestoreContext(SQInfo *pQInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); + SWITCH_ORDER(pQuery->order.order); if (pRuntimeEnv->pTsBuf != NULL) { SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); @@ -5797,9 +5550,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { */ if (QUERY_IS_INTERVAL_QUERY(pQuery)) { copyResToQueryResultBuf(pQInfo, pQuery); -#ifdef _DEBUG_VIEW - displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num); -#endif } else { copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); } @@ -5844,7 +5594,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) { - if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { + if (mergeGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); #ifdef _DEBUG_VIEW @@ -6016,33 +5766,6 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) } } -static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - while (1) { - scanOneTableDataBlocks(pRuntimeEnv, start); - - assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED)); - finalizeQueryResult(pRuntimeEnv); - - // here we can ignore the records in case of no interpolation - // todo handle offset, in case of top/bottom interval query - if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) && pQuery->limit.offset > 0 && - pQuery->fillType == TSDB_FILL_NONE) { - // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); - - int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset)); - popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, c); - pQuery->limit.offset -= c; - } - - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_RESBUF_FULL)) { - break; - } - } -} - // handle time interval query on table static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); @@ -6050,7 +5773,6 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; pQuery->current = pTableInfo; - int32_t numOfFilled = 0; TSKEY newStartKey = TSKEY_INITIAL_VAL; // skip blocks without load the actual data block from file if no filter condition present @@ -6062,59 +5784,39 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } } - while (1) { - tableIntervalProcessImpl(pRuntimeEnv, newStartKey); + scanOneTableDataBlocks(pRuntimeEnv, newStartKey); + assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED)); - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - pQInfo->groupIndex = 0; // always start from 0 - pQuery->rec.rows = 0; - copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); + finalizeQueryResult(pRuntimeEnv); - popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); - } + // skip offset result rows + pQuery->rec.rows = 0; - // no result generated, abort - if (pQuery->rec.rows == 0 || pRuntimeEnv->groupbyNormalCol) { - break; + if (pQuery->fillType == TSDB_FILL_NONE) { + // all data scanned, the group by normal column can return + int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); + if (pQuery->limit.offset > numOfClosed) { + return; } - doSecondaryArithmeticProcess(pQuery); - - // the offset is handled at prepare stage if no interpolation involved - if (pQuery->fillType == TSDB_FILL_NONE) { - limitResults(pRuntimeEnv); - break; - } else { - taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, pQuery->window.ekey); - taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage**) pQuery->sdata); - numOfFilled = 0; + pQInfo->groupIndex = pQuery->limit.offset; - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled); - if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - limitResults(pRuntimeEnv); - break; - } + copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); + doSecondaryArithmeticProcess(pQuery); - // no result generated yet, continue retrieve data - pQuery->rec.rows = 0; - } - } + limitResults(pRuntimeEnv); + } else { - // all data scanned, the group by normal column can return - if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result - // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); + copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); + doSecondaryArithmeticProcess(pQuery); - if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) { - // skip offset result rows - popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (int32_t) pQuery->limit.offset); + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, pQuery->window.ekey); + taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata); - pQuery->rec.rows = 0; - pQInfo->groupIndex = 0; - copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); + int32_t numOfFilled = 0; + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled); - doSecondaryArithmeticProcess(pQuery); + if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { limitResults(pRuntimeEnv); } } @@ -6125,7 +5827,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; if (queryHasRemainResForTableQuery(pRuntimeEnv)) { - if (pQuery->fillType != TSDB_FILL_NONE) { /* * There are remain results that are not returned due to result interpolation @@ -6142,23 +5843,23 @@ static void tableQueryImpl(SQInfo *pQInfo) { return; } else { pQuery->rec.rows = 0; - pQInfo->groupIndex = 0; // always start from 0 + assert(pRuntimeEnv->windowResInfo.size > 0); - if (pRuntimeEnv->windowResInfo.size > 0) { + if (pQInfo->groupIndex < pRuntimeEnv->windowResInfo.size) { copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); - - if (pQuery->rec.rows > 0) { - qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total); + } - // there are not data remains - if (pRuntimeEnv->windowResInfo.size <= 0) { - qDebug("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total); - } + if (pQuery->rec.rows > 0) { + qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pQuery->rec.rows, + pQuery->rec.total); + } - return; - } + // there are not data remains + if (pQuery->rec.rows <= 0 || pRuntimeEnv->windowResInfo.size <= pQInfo->groupIndex) { + qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total); } + + return; } } @@ -7276,6 +6977,8 @@ static void freeQInfo(SQInfo *pQInfo) { tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); taosHashCleanup(pQInfo->arrTableIdInfo); + taosArrayDestroy(pQInfo->groupResInfo.pRows); + pQInfo->signature = 0; qDebug("QInfo:%p QInfo is freed", pQInfo); @@ -7695,7 +7398,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co (*pRsp)->completed = 1; // notify no more result to client } else { *continueExec = true; - qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); + qDebug("QInfo:%p has more results to retrieve", pQInfo); } return pQInfo->code; @@ -8060,6 +7763,4 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) { taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); return 0; -} - - +} \ No newline at end of file diff --git a/src/query/src/qHistogram.c b/src/query/src/qHistogram.c index 35e5906d1f..bd979f1244 100644 --- a/src/query/src/qHistogram.c +++ b/src/query/src/qHistogram.c @@ -120,6 +120,7 @@ //} static int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val); +static int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val); SHistogramInfo* tHistogramCreate(int32_t numOfEntries) { /* need one redundant slot */ @@ -158,8 +159,8 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { } #if defined(USE_ARRAYLIST) - int32_t idx = vnodeHistobinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val); - assert(idx >= 0 && idx <= (*pHisto)->maxEntries); + int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val); + assert(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL); if ((*pHisto)->elems[idx].val == val && idx >= 0) { (*pHisto)->elems[idx].num += 1; @@ -356,7 +357,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { return 0; } -int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val) { +int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) { int32_t end = len - 1; int32_t start = 0; @@ -466,7 +467,7 @@ void tHistogramPrint(SHistogramInfo* pHisto) { */ int64_t tHistogramSum(SHistogramInfo* pHisto, double v) { #if defined(USE_ARRAYLIST) - int32_t slotIdx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, v); + int32_t slotIdx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, v); if (pHisto->elems[slotIdx].val != v) { slotIdx -= 1; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 65ac60e91f..d09993ae4e 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -96,8 +96,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo pResultRowInfo->curIndex = -1; pResultRowInfo->size = 0; - - pResultRowInfo->startTime = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; } @@ -110,7 +108,7 @@ void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow assert(num >= 0 && num <= numOfClosed); int16_t type = pResultRowInfo->type; - int64_t uid = getResultInfoUId(pRuntimeEnv); + int64_t uid = 0; char *key = NULL; int16_t bytes = -1; @@ -181,11 +179,12 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); for (int32_t i = 0; i < pResultRowInfo->size; ++i) { - if (pResultRowInfo->pResult[i]->closed) { + SResultRow* pRow = pResultRowInfo->pResult[i]; + if (pRow->closed) { continue; } - pResultRowInfo->pResult[i]->closed = true; + pRow->closed = true; } } @@ -383,18 +382,4 @@ void* destroyResultRowPool(SResultRowPool* p) { tfree(p); return NULL; -} - -uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) { - if (!pRuntimeEnv->stableQuery) { - return 0; // for simple table query, the uid is always set to be 0; - } - - SQuery* pQuery = pRuntimeEnv->pQuery; - if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) { - return 0; - } - - STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable); - return id->uid; } \ No newline at end of file -- GitLab