提交 2314b195 编写于 作者: H Haojun Liao

[TD-1870] reduce memory consumption during query processing.

上级 a8e86c9a
......@@ -489,7 +489,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscDebug("%p waiting for delete procedure, status: %d", pSql, status);
}
pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo);
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......
......@@ -40,6 +40,19 @@ typedef struct SGroupResInfo {
int32_t rowId;
} SGroupResInfo;
typedef struct SWindowResultPool {
int32_t elemSize;
int32_t blockSize;
int32_t numOfElemPerBlock;
struct {
int32_t blockIndex;
int32_t pos;
} position;
SArray* pData; // SArray<void*>
} SWindowResultPool;
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
......@@ -69,9 +82,7 @@ typedef struct SResultRec {
} SResultRec;
typedef struct SWindowResInfo {
SWindowResult* pResult; // result list
// uint64_t uid; // table uid, in order to identify the result from global hash table
// SHashObj* hashList; // hash list for quick access
SWindowResult** pResult; // result list
int16_t type; // data type for hash key
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
......@@ -180,6 +191,7 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pWindowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SWindowResultPool* pool; // window result object pool
} SQueryRuntimeEnv;
enum {
......
......@@ -71,7 +71,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);
void* taosDestoryFillInfo(SFillInfo *pFillInfo);
void* taosDestroyFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
......
......@@ -44,7 +44,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int
static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return &pWindowResInfo->pResult[slot];
return pWindowResInfo->pResult[slot];
}
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
......@@ -71,4 +71,14 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval);
__filter_func_t *getRangeFilterFuncArray(int32_t type);
__filter_func_t *getValueFilterFuncArray(int32_t type);
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv);
SWindowResultPool* initWindowResultPool(size_t size);
SWindowResult* getNewWindowResult(SWindowResultPool* p);
int64_t getWindowResultPoolMemSize(SWindowResultPool* p);
void* destroyWindowResultPool(SWindowResultPool* p);
int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p);
int32_t getNumOfUsedWindowResult(SWindowResultPool* p);
#endif // TDENGINE_QUERYUTIL_H
......@@ -469,30 +469,32 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
newCapacity = (int64_t)(pWindowResInfo->capacity * 1.5);
}
char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * sizeof(SWindowResult)));
pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SWindowResult);
pRuntimeEnv->summary.numOfTimeWindows += (newCapacity - pWindowResInfo->capacity);
char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES));
// pRuntimeEnv->summary.winInfoSize += (newCapacity - pWindowResInfo->capacity) * sizeof(SWindowResult);
// pRuntimeEnv->summary.numOfTimeWindows += (newCapacity - pWindowResInfo->capacity);
if (t == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pWindowResInfo->pResult = (SWindowResult *)t;
pWindowResInfo->pResult = (SWindowResult **)t;
int32_t inc = (int32_t)newCapacity - pWindowResInfo->capacity;
memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc);
memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, POINTER_BYTES * inc);
pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * inc;
for (int32_t i = pWindowResInfo->capacity; i < newCapacity; ++i) {
int32_t ret = createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
pWindowResInfo->capacity = (int32_t)newCapacity;
}
// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * inc;
SWindowResult* pResult = getNewWindowResult(pRuntimeEnv->pool);
pWindowResInfo->pResult[pWindowResInfo->size] = pResult;
// for (int32_t i = pWindowResInfo->capacity; i < newCapacity; ++i) {
int32_t ret = createQueryResultInfo(pQuery, pResult, pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
// }
pWindowResInfo->capacity = (int32_t)newCapacity;
}
// }
// add a new result set for a new group
pWindowResInfo->curIndex = pWindowResInfo->size++;
......@@ -632,7 +634,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
static bool getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size);
return pWindowResInfo->pResult[slot].closed;
return pWindowResInfo->pResult[slot]->closed;
}
static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos,
......@@ -691,7 +693,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
int64_t skey = TSKEY_INITIAL_VAL;
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
SWindowResult *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) {
numOfClosed += 1;
continue;
......@@ -715,7 +717,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
pWindowResInfo->curIndex = i;
}
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].win.skey;
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
// the number of completed slots are larger than the threshold, return current generated results to client.
if (numOfClosed > pWindowResInfo->threshold) {
......@@ -1756,7 +1758,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosTFree(pRuntimeEnv->pCtx);
}
pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo);
pRuntimeEnv->pFillInfo = taosDestroyFillInfo(pRuntimeEnv->pFillInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
......@@ -1767,6 +1769,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pWindowHashTable);
pRuntimeEnv->pWindowHashTable = NULL;
pRuntimeEnv->pool = destroyWindowResultPool(pRuntimeEnv->pool);
}
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
......@@ -3326,11 +3330,13 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize) {
int32_t numOfCols = pQuery->numOfOutput;
size_t size = numOfCols * sizeof(SResultInfo) + interBufSize;
pResultRow->resultInfo = calloc(1, size);
if (pResultRow->resultInfo == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
// size_t size = numOfCols * sizeof(SResultInfo) + interBufSize;
pResultRow->resultInfo = (SResultInfo*)((char*)pResultRow + sizeof(SWindowResult));
// pResultRow->resultInfo = calloc(1, size);
// if (pResultRow->resultInfo == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
pResultRow->pageId = -1;
pResultRow->rowId = -1;
......@@ -3698,7 +3704,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *buf = &pWindowResInfo->pResult[i];
SWindowResult *buf = pWindowResInfo->pResult[i];
if (!isWindowResClosed(pWindowResInfo, i)) {
continue;
}
......@@ -4009,7 +4015,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
int32_t totalSet = numOfClosedTimeWindow(pResultInfo);
SWindowResult* result = pResultInfo->pResult;
SWindowResult** result = pResultInfo->pResult;
if (orderType == TSDB_ORDER_ASC) {
startIdx = pQInfo->groupIndex;
......@@ -4022,13 +4028,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
if (result[i].numOfRows == 0) {
if (result[i]->numOfRows == 0) {
pQInfo->groupIndex += 1;
pGroupResInfo->rowId = 0;
continue;
}
int32_t numOfRowsToCopy = result[i].numOfRows - pGroupResInfo->rowId;
int32_t numOfRowsToCopy = result[i]->numOfRows - pGroupResInfo->rowId;
int32_t oldOffset = pGroupResInfo->rowId;
/*
......@@ -4043,13 +4049,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
pQInfo->groupIndex += 1;
}
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i].pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i]->pageId);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
char *out = pQuery->sdata[j]->data + numOfResult * size;
char *in = getPosInResultPage(pRuntimeEnv, j, &result[i], page);
char *in = getPosInResultPage(pRuntimeEnv, j, result[i], page);
memcpy(out, in + oldOffset * size, size * numOfRowsToCopy);
}
......@@ -4096,7 +4102,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
}
for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i];
SWindowResult *pResult = pRuntimeEnv->windowResInfo.pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pRuntimeEnv->pCtx[j].functionId;
......@@ -4251,31 +4257,22 @@ static void queryCostStatis(SQInfo *pQInfo) {
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pWindowHashTable);
hashSize += taosHashGetMemSize(pQInfo->tableqinfoGroupInfo.map);
int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* pa = GET_TABLEGROUP(pQInfo, i);
int32_t numOfTables = taosArrayGetSize(pa);
for(int32_t j = 0; j < numOfTables; ++j) {
// STableQueryInfo* pTableQueryInfo = taosArrayGetP(pa, j);
// hashSize += taosHashGetMemSize(pTableQueryInfo->windowResInfo.hashList);
}
}
pSummary->hashSize = hashSize;
// add the merge time
pSummary->elapsedTime += pSummary->firstStageMergeTime;
SWindowResultPool* p = pQInfo->runtimeEnv.pool;
pSummary->winInfoSize = getWindowResultPoolMemSize(p);
pSummary->numOfTimeWindows = getNumOfAllocatedWindowResult(p);
qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pQInfo, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
qDebug("QInfo:%p :cost summary: windowInfo size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo, pSummary->winInfoSize/1024.0,
qDebug("QInfo:%p :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo, pSummary->winInfoSize/1024.0,
pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
}
......@@ -5034,9 +5031,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
pWindowResInfo->pResult[i].closed = true; // enable return all results for group by normal columns
pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns
SWindowResult *pResult = &pWindowResInfo->pResult[i];
SWindowResult *pResult = pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes));
}
......@@ -6337,6 +6334,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->runtimeEnv.pWindowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW);
pQInfo->runtimeEnv.pool = initWindowResultPool(getWindowResultSize(&pQInfo->runtimeEnv));
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pQInfo->pBuf == NULL) {
......
......@@ -91,7 +91,7 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
pFillInfo->numOfTotal = 0;
}
void* taosDestoryFillInfo(SFillInfo* pFillInfo) {
void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
if (pFillInfo == NULL) {
return NULL;
}
......
......@@ -14,8 +14,9 @@
*/
#include "os.h"
#include "hash.h"
#include "taosmsg.h"
#include "hash.h"
#include "qExecutor.h"
#include "qUtil.h"
......@@ -40,38 +41,29 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->size = 0;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
SQueryCostInfo* pSummary = &pRuntimeEnv->summary;
// SQueryCostInfo* pSummary = &pRuntimeEnv->summary;
// use the pointer arraylist
pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, sizeof(SWindowResult));
pWindowResInfo->pResult = calloc(pWindowResInfo->capacity, POINTER_BYTES);
if (pWindowResInfo->pResult == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
pWindowResInfo->interval = pRuntimeEnv->pQuery->interval.interval;
pSummary->winInfoSize += sizeof(SWindowResult) * pWindowResInfo->capacity;
pSummary->winInfoSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity;
pSummary->numOfTimeWindows = pWindowResInfo->capacity;
// pSummary->winInfoSize += POINTER_BYTES * pWindowResInfo->capacity;
// pSummary->winInfoSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity;
// pSummary->numOfTimeWindows = pWindowResInfo->capacity;
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
// int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// }
return TSDB_CODE_SUCCESS;
}
void destroyTimeWindowRes(SWindowResult *pWindowRes) {
if (pWindowRes == NULL) {
return;
}
free(pWindowRes->resultInfo);
}
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
if (pWindowResInfo == NULL) {
return;
......@@ -81,13 +73,6 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
return;
}
if (pWindowResInfo->pResult != NULL) {
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
destroyTimeWindowRes(&pWindowResInfo->pResult[i]);
}
}
// taosHashCleanup(pWindowResInfo->hashList);
taosTFree(pWindowResInfo->pResult);
}
......@@ -97,17 +82,13 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pWindowRes = &pWindowResInfo->pResult[i];
SWindowResult *pWindowRes = pWindowResInfo->pResult[i];
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
}
pWindowResInfo->curIndex = -1;
// taosHashCleanup(pWindowResInfo->hashList);
pWindowResInfo->size = 0;
// _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
// pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false);
pWindowResInfo->startTime = TSKEY_INITIAL_VAL;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
}
......@@ -127,7 +108,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
int16_t bytes = -1;
for (int32_t i = 0; i < num; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
SWindowResult *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) { // remove the window slot from hash table
// todo refactor
......@@ -150,19 +131,19 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
// clear all the closed windows from the window list
for (int32_t k = 0; k < remain; ++k) {
copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[num + k]);
copyTimeWindowResBuf(pRuntimeEnv, pWindowResInfo->pResult[k], pWindowResInfo->pResult[num + k]);
}
// move the unclosed window in the front of the window list
for (int32_t k = remain; k < pWindowResInfo->size; ++k) {
SWindowResult *pWindowRes = &pWindowResInfo->pResult[k];
SWindowResult *pWindowRes = pWindowResInfo->pResult[k];
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
}
pWindowResInfo->size = remain;
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
SWindowResult *pResult = &pWindowResInfo->pResult[k];
SWindowResult *pResult = pWindowResInfo->pResult[k];
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
key = varDataVal(pResult->key);
......@@ -198,7 +179,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) {
int32_t i = 0;
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i].closed) {
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i]->closed) {
++i;
}
......@@ -209,11 +190,11 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
if (pWindowResInfo->pResult[i].closed) {
if (pWindowResInfo->pResult[i]->closed) {
continue;
}
pWindowResInfo->pResult[i].closed = true;
pWindowResInfo->pResult[i]->closed = true;
}
}
......@@ -229,19 +210,19 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
// get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0].win.skey < pWindowResInfo->pResult[1].win.skey)? 1:-1;
int32_t resultOrder = (pWindowResInfo->pResult[0]->win.skey < pWindowResInfo->pResult[1]->win.skey)? 1:-1;
if (order != resultOrder) {
return;
}
int32_t i = 0;
if (order == QUERY_ASC_FORWARD_STEP) {
TSKEY ekey = pWindowResInfo->pResult[i].win.ekey;
TSKEY ekey = pWindowResInfo->pResult[i]->win.ekey;
while (i < pWindowResInfo->size && (ekey < lastKey)) {
++i;
}
} else if (order == QUERY_DESC_FORWARD_STEP) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].win.skey > lastKey)) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i]->win.skey > lastKey)) {
++i;
}
}
......@@ -318,3 +299,77 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
}
}
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) {
return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo)) + pRuntimeEnv->interBufSize + sizeof(SWindowResult);
}
SWindowResultPool* initWindowResultPool(size_t size) {
SWindowResultPool* p = calloc(1, sizeof(SWindowResultPool));
if (p == NULL) {
return NULL;
}
p->numOfElemPerBlock = 128;
p->elemSize = size;
p->blockSize = p->numOfElemPerBlock * p->elemSize;
p->position.pos = 0;
p->pData = taosArrayInit(8, POINTER_BYTES);
return p;
}
SWindowResult* getNewWindowResult(SWindowResultPool* p) {
if (p == NULL) {
return NULL;
}
void* ptr = NULL;
if (p->position.pos == 0) {
ptr = calloc(1, p->blockSize);
taosArrayPush(p->pData, &ptr);
} else {
size_t last = taosArrayGetSize(p->pData);
void** pBlock = taosArrayGet(p->pData, last - 1);
ptr = (*pBlock) + p->elemSize * p->position.pos;
}
p->position.pos = (p->position.pos + 1)%p->numOfElemPerBlock;
return ptr;
}
int64_t getWindowResultPoolMemSize(SWindowResultPool* p) {
if (p == NULL) {
return 0;
}
return taosArrayGetSize(p->pData) * p->blockSize;
}
int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p) {
return taosArrayGetSize(p->pData) * p->numOfElemPerBlock;
}
int32_t getNumOfUsedWindowResult(SWindowResultPool* p) {
return getNumOfAllocatedWindowResult(p) - p->numOfElemPerBlock + p->position.pos;
}
void* destroyWindowResultPool(SWindowResultPool* p) {
if (p == NULL) {
return NULL;
}
size_t size = taosArrayGetSize(p->pData);
for(int32_t i = 0; i < size; ++i) {
void** ptr = taosArrayGet(p->pData, i);
taosTFree(*ptr);
}
taosArrayDestroy(p->pData);
taosTFree(p);
return NULL;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册