diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index ea073e8192e73f0f2bc1e3d8351decd9177ba403..8c835806a77732e02398b9fdbf07be135c19c0ab 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -489,7 +489,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { tscDebug("%p waiting for delete procedure, status: %d", pSql, status); } - pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo); + pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo); if (pLocalReducer->pCtx != NULL) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ecedf4c13f6337cb898e5e4819188835ff6275a5..ab20840cbeac2f6b3f906869aff2f33ae653d755 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -40,6 +40,19 @@ typedef struct SGroupResInfo { int32_t rowId; } SGroupResInfo; +typedef struct SWindowResultPool { + int32_t elemSize; + int32_t blockSize; + int32_t numOfElemPerBlock; + + struct { + int32_t blockIndex; + int32_t pos; + } position; + + SArray* pData; // SArray +} SWindowResultPool; + typedef struct SSqlGroupbyExpr { int16_t tableIndex; SArray* columnInfo; // SArray, group by columns information @@ -69,9 +82,7 @@ typedef struct SResultRec { } SResultRec; typedef struct SWindowResInfo { - SWindowResult* pResult; // result list -// uint64_t uid; // table uid, in order to identify the result from global hash table -// SHashObj* hashList; // hash list for quick access + SWindowResult** pResult; // result list int16_t type; // data type for hash key int32_t capacity; // max capacity int32_t curIndex; // current start active index @@ -180,6 +191,7 @@ typedef struct SQueryRuntimeEnv { SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pWindowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer + SWindowResultPool* pool; // window result object pool } SQueryRuntimeEnv; enum { diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index 6d44fee09557fa8b8d73527677a2c7b238ecb42c..329ea9a789cf7e5c8963788c68981b63ccb234e3 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -71,7 +71,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp); -void* taosDestoryFillInfo(SFillInfo *pFillInfo); +void* taosDestroyFillInfo(SFillInfo *pFillInfo); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index fc997a44c595ac1ae1792c2afd24c5867b7b046a..377578042394590c75d9bba67dfc5de8559777d8 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -44,7 +44,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); - return &pWindowResInfo->pResult[slot]; + return pWindowResInfo->pResult[slot]; } #define curTimeWindowIndex(_winres) ((_winres)->curIndex) @@ -71,4 +71,14 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval); __filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type); +size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv); + +SWindowResultPool* initWindowResultPool(size_t size); +SWindowResult* getNewWindowResult(SWindowResultPool* p); +int64_t getWindowResultPoolMemSize(SWindowResultPool* p); +void* destroyWindowResultPool(SWindowResultPool* p); +int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p); +int32_t getNumOfUsedWindowResult(SWindowResultPool* p); + + #endif // TDENGINE_QUERYUTIL_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 32ed7190c6d56bd3d87dee09aa33bdd3b1ec68e5..d5e1962eeaa94a13e202222d9c825d03929169fb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -469,30 +469,32 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin newCapacity = (int64_t)(pWindowResInfo->capacity * 1.5); } - char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * sizeof(SWindowResult))); - pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SWindowResult); - pRuntimeEnv->summary.numOfTimeWindows += (newCapacity - pWindowResInfo->capacity); + char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); + // pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SWindowResult); + // pRuntimeEnv->summary.numOfTimeWindows += (newCapacity - pWindowResInfo->capacity); if (t == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - pWindowResInfo->pResult = (SWindowResult *)t; + pWindowResInfo->pResult = (SWindowResult **)t; int32_t inc = (int32_t)newCapacity - pWindowResInfo->capacity; - memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc); + memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, POINTER_BYTES * inc); - pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * inc; - - for (int32_t i = pWindowResInfo->capacity; i < newCapacity; ++i) { - int32_t ret = createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); + pWindowResInfo->capacity = (int32_t)newCapacity; + } +// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * inc; + SWindowResult* pResult = getNewWindowResult(pRuntimeEnv->pool); + pWindowResInfo->pResult[pWindowResInfo->size] = pResult; +// for (int32_t i = pWindowResInfo->capacity; i < newCapacity; ++i) { + int32_t ret = createQueryResultInfo(pQuery, pResult, pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); if (ret != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - } +// } - pWindowResInfo->capacity = (int32_t)newCapacity; - } +// } // add a new result set for a new group pWindowResInfo->curIndex = pWindowResInfo->size++; @@ -632,7 +634,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes static bool getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(slot >= 0 && slot < pWindowResInfo->size); - return pWindowResInfo->pResult[slot].closed; + return pWindowResInfo->pResult[slot]->closed; } static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, @@ -691,7 +693,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe int64_t skey = TSKEY_INITIAL_VAL; for (i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = &pWindowResInfo->pResult[i]; + SWindowResult *pResult = pWindowResInfo->pResult[i]; if (pResult->closed) { numOfClosed += 1; continue; @@ -715,7 +717,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe pWindowResInfo->curIndex = i; } - pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].win.skey; + pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; // the number of completed slots are larger than the threshold, return current generated results to client. if (numOfClosed > pWindowResInfo->threshold) { @@ -1756,7 +1758,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosTFree(pRuntimeEnv->pCtx); } - pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo); + pRuntimeEnv->pFillInfo = taosDestroyFillInfo(pRuntimeEnv->pFillInfo); destroyResultBuf(pRuntimeEnv->pResultBuf); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); @@ -1767,6 +1769,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosHashCleanup(pRuntimeEnv->pWindowHashTable); pRuntimeEnv->pWindowHashTable = NULL; + + pRuntimeEnv->pool = destroyWindowResultPool(pRuntimeEnv->pool); } #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) @@ -3326,11 +3330,13 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize) { int32_t numOfCols = pQuery->numOfOutput; - size_t size = numOfCols * sizeof(SResultInfo) + interBufSize; - pResultRow->resultInfo = calloc(1, size); - if (pResultRow->resultInfo == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } +// size_t size = numOfCols * sizeof(SResultInfo) + interBufSize; + pResultRow->resultInfo = (SResultInfo*)((char*)pResultRow + sizeof(SWindowResult)); + +// pResultRow->resultInfo = calloc(1, size); +// if (pResultRow->resultInfo == NULL) { +// return TSDB_CODE_QRY_OUT_OF_MEMORY; +// } pResultRow->pageId = -1; pResultRow->rowId = -1; @@ -3698,7 +3704,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *buf = &pWindowResInfo->pResult[i]; + SWindowResult *buf = pWindowResInfo->pResult[i]; if (!isWindowResClosed(pWindowResInfo, i)) { continue; } @@ -4009,7 +4015,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); int32_t totalSet = numOfClosedTimeWindow(pResultInfo); - SWindowResult* result = pResultInfo->pResult; + SWindowResult** result = pResultInfo->pResult; if (orderType == TSDB_ORDER_ASC) { startIdx = pQInfo->groupIndex; @@ -4022,13 +4028,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) { - if (result[i].numOfRows == 0) { + if (result[i]->numOfRows == 0) { pQInfo->groupIndex += 1; pGroupResInfo->rowId = 0; continue; } - int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->rowId; + int32_t numOfRowsToCopy = result[i]->numOfRows - pGroupResInfo->rowId; int32_t oldOffset = pGroupResInfo->rowId; /* @@ -4043,13 +4049,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_ pQInfo->groupIndex += 1; } - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pageId); + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i]->pageId); for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t size = pRuntimeEnv->pCtx[j].outputBytes; char *out = pQuery->sdata[j]->data + numOfResult * size; - char *in = getPosInResultPage(pRuntimeEnv, j, &result[i], page); + char *in = getPosInResultPage(pRuntimeEnv, j, result[i], page); memcpy(out, in + oldOffset * size, size * numOfRowsToCopy); } @@ -4096,7 +4102,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) { } for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) { - SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i]; + SWindowResult *pResult = pRuntimeEnv->windowResInfo.pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pRuntimeEnv->pCtx[j].functionId; @@ -4251,31 +4257,22 @@ static void queryCostStatis(SQInfo *pQInfo) { SQueryCostInfo *pSummary = &pRuntimeEnv->summary; uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pWindowHashTable); - hashSize += taosHashGetMemSize(pQInfo->tableqinfoGroupInfo.map); - int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo); - for(int32_t i = 0; i < numOfGroup; ++i) { - SArray* pa = GET_TABLEGROUP(pQInfo, i); - - int32_t numOfTables = taosArrayGetSize(pa); - for(int32_t j = 0; j < numOfTables; ++j) { -// STableQueryInfo* pTableQueryInfo = taosArrayGetP(pa, j); - -// hashSize += taosHashGetMemSize(pTableQueryInfo->windowResInfo.hashList); - } - } - pSummary->hashSize = hashSize; // add the merge time pSummary->elapsedTime += pSummary->firstStageMergeTime; + SWindowResultPool* p = pQInfo->runtimeEnv.pool; + pSummary->winInfoSize = getWindowResultPoolMemSize(p); + pSummary->numOfTimeWindows = getNumOfAllocatedWindowResult(p); + qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, pQInfo, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); - qDebug("QInfo:%p :cost summary: windowInfo size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo, pSummary->winInfoSize/1024.0, + qDebug("QInfo:%p :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo, pSummary->winInfoSize/1024.0, pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); } @@ -5034,9 +5031,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - pWindowResInfo->pResult[i].closed = true; // enable return all results for group by normal columns + pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns - SWindowResult *pResult = &pWindowResInfo->pResult[i]; + SWindowResult *pResult = pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes)); } @@ -6337,6 +6334,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.pWindowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); + pQInfo->runtimeEnv.pool = initWindowResultPool(getWindowResultSize(&pQInfo->runtimeEnv)); pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pQInfo->pBuf == NULL) { diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index f186726c0120f2e2e34580fec0da00c2083e5da9..99fa3a8e0ffc9b53ae370d4e5860bf16a8988367 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -91,7 +91,7 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) { pFillInfo->numOfTotal = 0; } -void* taosDestoryFillInfo(SFillInfo* pFillInfo) { +void* taosDestroyFillInfo(SFillInfo* pFillInfo) { if (pFillInfo == NULL) { return NULL; } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 8273a2139535916e07bbc3e44c222acea3269036..7ec39cc0c895871400e5d5694dace8c0343c0235 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -14,8 +14,9 @@ */ #include "os.h" -#include "hash.h" #include "taosmsg.h" +#include "hash.h" + #include "qExecutor.h" #include "qUtil.h" @@ -40,38 +41,29 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun pWindowResInfo->size = 0; pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; - SQueryCostInfo* pSummary = &pRuntimeEnv->summary; +// SQueryCostInfo* pSummary = &pRuntimeEnv->summary; - // use the pointer arraylist - pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, sizeof(SWindowResult)); + pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES); if (pWindowResInfo->pResult == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } pWindowResInfo->interval = pRuntimeEnv->pQuery->interval.interval; - pSummary->winInfoSize += sizeof(SWindowResult) * pWindowResInfo->capacity; - pSummary->winInfoSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity; - pSummary->numOfTimeWindows = pWindowResInfo->capacity; +// pSummary->winInfoSize += POINTER_BYTES * pWindowResInfo->capacity; +// pSummary->winInfoSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity; +// pSummary->numOfTimeWindows = pWindowResInfo->capacity; - for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { - int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } +// for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { +// int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); +// if (code != TSDB_CODE_SUCCESS) { +// return code; +// } +// } return TSDB_CODE_SUCCESS; } -void destroyTimeWindowRes(SWindowResult *pWindowRes) { - if (pWindowRes == NULL) { - return; - } - - free(pWindowRes->resultInfo); -} - void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) { if (pWindowResInfo == NULL) { return; @@ -81,13 +73,6 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) { return; } - if (pWindowResInfo->pResult != NULL) { - for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { - destroyTimeWindowRes(&pWindowResInfo->pResult[i]); - } - } - -// taosHashCleanup(pWindowResInfo->hashList); taosTFree(pWindowResInfo->pResult); } @@ -97,17 +82,13 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pWindowRes = &pWindowResInfo->pResult[i]; + SWindowResult *pWindowRes = pWindowResInfo->pResult[i]; clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); } pWindowResInfo->curIndex = -1; -// taosHashCleanup(pWindowResInfo->hashList); pWindowResInfo->size = 0; -// _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type); -// pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false); - pWindowResInfo->startTime = TSKEY_INITIAL_VAL; pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; } @@ -127,7 +108,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { int16_t bytes = -1; for (int32_t i = 0; i < num; ++i) { - SWindowResult *pResult = &pWindowResInfo->pResult[i]; + SWindowResult *pResult = pWindowResInfo->pResult[i]; if (pResult->closed) { // remove the window slot from hash table // todo refactor @@ -150,19 +131,19 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { // clear all the closed windows from the window list for (int32_t k = 0; k < remain; ++k) { - copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[num + k]); + copyTimeWindowResBuf(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k]); } // move the unclosed window in the front of the window list for (int32_t k = remain; k < pWindowResInfo->size; ++k) { - SWindowResult *pWindowRes = &pWindowResInfo->pResult[k]; + SWindowResult *pWindowRes = pWindowResInfo->pResult[k]; clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); } pWindowResInfo->size = remain; for (int32_t k = 0; k < pWindowResInfo->size; ++k) { - SWindowResult *pResult = &pWindowResInfo->pResult[k]; + SWindowResult *pResult = pWindowResInfo->pResult[k]; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { key = varDataVal(pResult->key); @@ -198,7 +179,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) { int32_t i = 0; - while (i < pWindowResInfo->size && pWindowResInfo->pResult[i].closed) { + while (i < pWindowResInfo->size && pWindowResInfo->pResult[i]->closed) { ++i; } @@ -209,11 +190,11 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - if (pWindowResInfo->pResult[i].closed) { + if (pWindowResInfo->pResult[i]->closed) { continue; } - pWindowResInfo->pResult[i].closed = true; + pWindowResInfo->pResult[i]->closed = true; } } @@ -229,19 +210,19 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_ } // get the result order - int32_t resultOrder = (pWindowResInfo->pResult[0].win.skey < pWindowResInfo->pResult[1].win.skey)? 1:-1; + int32_t resultOrder = (pWindowResInfo->pResult[0]->win.skey < pWindowResInfo->pResult[1]->win.skey)? 1:-1; if (order != resultOrder) { return; } int32_t i = 0; if (order == QUERY_ASC_FORWARD_STEP) { - TSKEY ekey = pWindowResInfo->pResult[i].win.ekey; + TSKEY ekey = pWindowResInfo->pResult[i]->win.ekey; while (i < pWindowResInfo->size && (ekey < lastKey)) { ++i; } } else if (order == QUERY_DESC_FORWARD_STEP) { - while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].win.skey > lastKey)) { + while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i]->win.skey > lastKey)) { ++i; } } @@ -318,3 +299,77 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con } } +size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) { + return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo)) + pRuntimeEnv->interBufSize + sizeof(SWindowResult); +} + +SWindowResultPool* initWindowResultPool(size_t size) { + SWindowResultPool* p = calloc(1, sizeof(SWindowResultPool)); + if (p == NULL) { + return NULL; + } + + p->numOfElemPerBlock = 128; + + p->elemSize = size; + p->blockSize = p->numOfElemPerBlock * p->elemSize; + + p->position.pos = 0; + + p->pData = taosArrayInit(8, POINTER_BYTES); + return p; +} + +SWindowResult* getNewWindowResult(SWindowResultPool* p) { + if (p == NULL) { + return NULL; + } + + void* ptr = NULL; + if (p->position.pos == 0) { + ptr = calloc(1, p->blockSize); + taosArrayPush(p->pData, &ptr); + + } else { + size_t last = taosArrayGetSize(p->pData); + + void** pBlock = taosArrayGet(p->pData, last - 1); + ptr = (*pBlock) + p->elemSize * p->position.pos; + } + + p->position.pos = (p->position.pos + 1)%p->numOfElemPerBlock; + return ptr; +} + +int64_t getWindowResultPoolMemSize(SWindowResultPool* p) { + if (p == NULL) { + return 0; + } + + return taosArrayGetSize(p->pData) * p->blockSize; +} + +int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p) { + return taosArrayGetSize(p->pData) * p->numOfElemPerBlock; +} + +int32_t getNumOfUsedWindowResult(SWindowResultPool* p) { + return getNumOfAllocatedWindowResult(p) - p->numOfElemPerBlock + p->position.pos; +} + +void* destroyWindowResultPool(SWindowResultPool* p) { + if (p == NULL) { + return NULL; + } + + size_t size = taosArrayGetSize(p->pData); + for(int32_t i = 0; i < size; ++i) { + void** ptr = taosArrayGet(p->pData, i); + taosTFree(*ptr); + } + + taosArrayDestroy(p->pData); + + taosTFree(p); + return NULL; +}